Niestety, nadal tego nie ogarniam.
Mi IntelliJ nie podświetla subscribe()
, ale ja używam Community, więc może jest mniej intelli ;) W każdym razie Twój kod @Charles_Ray nie zadziała poprawnie, bo wyśle wiadmość tylko do samego siebie. Mi chodzi o to, żeby dwie sesje websocketów mogły wymieniać wiadomości między sobą. I po to cała ta zabawa z publisherem -> mamy Fluxa, który dostępny jest dla wszystkich websocketowych sesji, dajemy na nim .share()
po to, aby wiadomość była dostępna dla wszystkich consumerów (inaczej jedna sesja by skonsumowała i druga nie miałaby prawa jej widzieć).
Próbowałem to ogarnąć z .flatMap
tak jak radziliście, ale mi to nie działa poprawnie. Messege idą w nieprzewidywalny sposób, raz jedna, raz wiele na raz, raz w ogóle. Do tego jak rozłączymy wszystkie sesje WebSocket i połączymy jeszcze raz to wiadomości w ogóle nie dochodzą.
Kopiuj
return webSocketSession.send(
webSocketSession.receive()
.map(webSocketMessage -> webSocketMessage.getPayloadAsText())
.map(helloMessage -> greetingsService.greeting(helloMessage))
.map(greetings -> greetingsPublisher.push(greetings))
.flatMap(greetings -> publisher
.map(publisherGreetings -> webSocketSession.textMessage(publisherGreetings)))
);
Działa mi to tylko jeśli na webSocketSession.receive()
dam subscribe()
.
Kod: https://github.com/kkojot/multiroomchat-spring-webflux-vue/tree/master/spring-backend/src/main/java/com/kojodev/blog/multiroomchat/webfluxmultiroomchat/greetings
Ja testuję to przy pomocy Apic Chrome extension https://chrome.google.com/webstore/detail/apic-complete-api-solutio/ggnhohnkfcpcanfekomdkjffnfcjnjam. Robię dwa połączenia na ws://localhost:8080/greetings
i pcham message {"name":"kojot"}
Co do drugiego zadania, filtrowania wiadomości do konkretnych userów próbwałem zrobić tak jak @jarekr000000 pisał, ale to nie działa mi w ogóle. Wiadomości idą do wszystkich sesji.
Kopiuj
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
webSocketSession
.receive()
.map(webSocketMessage -> webSocketMessage.getPayloadAsText())
.map(message -> new SimpleTeamMessageResolver(webSocketSession, message))
.map(resolver -> resolver.process())
.map(message -> teamMessagePublisher.push(message))
.subscribe();
return webSocketSession.send(
SimpleTeamMessageResolver
.filterTeamMessage(webSocketSession, publisher)
.map(message -> webSocketSession.textMessage(message)));
}
Kopiuj
public class SimpleTeamMessageResolver {
private final WebSocketSession webSocketSession;
private final String message;
public SimpleTeamMessageResolver(WebSocketSession webSocketSession, String message) {
this.webSocketSession = webSocketSession;
this.message = message;
}
public static Flux<String> filterTeamMessage(WebSocketSession session, Flux<String> messages) {
final String teamColor = teamColorFromSession(session);
if (teamColor.equals("UNKNOWN")) return messages;
final Flux<String> filtered = messages
.filter(message -> message.startsWith(teamColor));
return filtered;
}
private static String teamColorFromSession(WebSocketSession session) {
return Option.of(session
.getAttributes()
.get("team"))
.map(color -> color.toString())
.getOrElse("UNKNOWN");
}
public String process() {
if (message.startsWith("join:")) {
return handleJoin();
} else {
return handleMessage();
}
}
private String handleJoin() {
final String teamColor = teamColorFromMessage();
webSocketSession
.getAttributes()
.put("team", teamColor);
return "New guy joined the " + teamColor + " team.";
}
private String handleMessage() {
final String teamColor = teamColorFromSession(webSocketSession);
return teamColor + ": " + message;
}
public String teamColorFromMessage() {
return Option.of(message.split(":"))
.map(strings -> strings.length > 1 ? strings[1] : "BLUE")
.map(color -> color.toUpperCase().trim().equals("RED") ? "RED" : "BLUE")
.getOrElse("BLUE");
}
}
Testowałem tak jak wyżej greetings, endpoint ws://localhost:8080/team
. Messege typu join: BLUE
i join: RED
przypisują nas do teamu, później każdy jeden String chciałbym aby szedł tylko do sesji w danym teamie. Próbowałem to debugować, ale ogólnie to w metodę handle()
czy process()
wchodzę tylko po nawiązaniu połączenia z WebSocket, a nie za każdym messegem.
Kod: https://github.com/kkojot/multiroomchat-spring-webflux-vue/tree/master/spring-backend/src/main/java/com/kojodev/blog/multiroomchat/webfluxmultiroomchat/team
Kończą mi się pomysły, jeśli Wy jakieś macie to śmiało piszcie ;)