Zwracanie Flux w Reactive Spring

Zwracanie Flux w Reactive Spring
  • Rejestracja: dni
  • Ostatnio: dni
0

Hej mam aplikację która używa webfluxa i reactive mongo. Mam klasę która ma pole value i chciałem zwrócić wszystkie eleemnty dla których pole value == "test".
Napisałem metodę w repozytorium

Kopiuj
interface DataRepository extends ReactiveMongoRepository<Data, String> {
    Flux<Note> findAllByValue(final String value);
}

I metodę kontrollera

Kopiuj
@GetMapping("/{value}")
    public Flux<Data> getAllByValue(@PathVariable("value") final String value){
        return dataRepository.findAllByValue(final String value);
    }

I kiedy tak się odpytam otrzymuję java.lang.IllegalArgumentException: Multi-value reactive types not supported in view resolution:. Miał ktoś już podobny problem i zna rozwiązanie albo jak to w tym webfluxie poprawnie zrobić

Poniżej stacktrace

Kopiuj
java.lang.IllegalArgumentException: Multi-value reactive types not supported in view resolution: reactor.core.publisher.Flux<com.data.Data>
	at org.springframework.web.reactive.result.view.ViewResolutionResultHandler.handleResult(ViewResolutionResultHandler.java:184) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.web.reactive.DispatcherHandler.handleResult(DispatcherHandler.java:176) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.web.reactive.DispatcherHandler.lambda$handle$2(DispatcherHandler.java:161) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:290) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoDefaultIfEmpty.subscribe(MonoDefaultIfEmpty.java:37) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
	at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:381) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]
	at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:397) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_162]
jarekr000000
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: U krasnoludów - pod górą
  • Postów: 4712
1

Ale czemu w repo masz Flux<Note> a nie Flux<Data i co to w ogóle za klasa Data?

  • Rejestracja: dni
  • Ostatnio: dni
0

to tam źle się przekopiowało powinno być Data nie Note, a Data to reprezentacja dokumentu w mongo

Kopiuj
@Document
public final class Data {
    ...
    private String value;
    ...

I chciałem żeby mi zwróciło wszystkie obiekty których pole value ma wartość jakąś tam którą piszę w zapytaniu

KA
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Warszawa
  • Postów: 1683
1

Słowo "Data" w tym przypadku jest złą nazwą nieniosącą żadnego znaczenia. Klasy i obiekty powinny być rzeczownikami lub wyrażeniami rzeczownikowymi, takimi jak Customer, WikiPage, Account czy tez AddressParser. Należy unikać w nazwach klas słów takich jak Manager, Processor, Data lub Info. Nazwy klas nie powinny być czasownikami.

Schadoow
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 1082
0

Zrób test zobacz czy uda ci się strzelić do tej metody:

Kopiuj
    @GetMapping(value = "/test", produces = "application/stream+json")
    public Flux<Long> test() {
        return Flux
                .interval(Duration.ofSeconds(1)).onBackpressureDrop();
    }
  • Rejestracja: dni
  • Ostatnio: dni
0

Mam 3 streamy typu Flux.fromStream(...) , chcialbym zrobic zip tych trzech i wypelnic Dto/Json z 3 listami.
jak to najlepiej zrobic? czy zip zamknie mi stream jak jeden ze streamow sie skonczy i bedzie cancel ?

Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

Jak jeden z Fluxów się skończy, to pozostałe użyte w zip dostaną tę informację i subskrybcja na nich zostanie anulowana. Czyli tak.

  • Rejestracja: dni
  • Ostatnio: dni
0
Michał Sikora napisał(a):

Jak jeden z Fluxów się skończy, to pozostałe użyte w zip dostaną tę informację i subskrybcja na nich zostanie anulowana. Czyli tak.

Czyli to co bym chcial zrobic to zebranie kilku streamow niezaleznie od siebie i napchanie ich do obiektu. Nic innego mi nie przychodzi do glowy. Jak to najlepiej osiagnac?

Chyba, ze mozna cos zaradzic na ten zip?

Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

Zależy od tego jak mają wyglądać dane wejściowe i wyjściowe. Jeżeli wejścia zwracają dane tego samego typu to na przykład.

Kopiuj
Flux<Integer> flux1 = Flux.range(0, 7);
Flux<Integer> flux2 = Flux.range(0, 5);
Flux<Integer> flux3 = Flux.range(0, 4);
Mono<Data> result = Flux.merge(flux1, flux2, flux3)
    .collectList()
    .map(Data::new);

Jeżeli są różne to np.

Kopiuj
Mono<List<Integer>> mono1 = Flux.range(0, 7).collectList();
Mono<List<String>> mono2 = Flux.just("a", "b", "c").collectList();
Mono<List<Boolean>> mono3 = Flux.just(true, false, false).collectList();
Mono<Data> result = Mono.zip(mono1, mono2, mono3)
    .map(this::dataFromTuple);

Zakładam, że strumienie nie są nieskończone.

  • Rejestracja: dni
  • Ostatnio: dni
0
Michał Sikora napisał(a):

Zależy od tego jak mają wyglądać dane wejściowe i wyjściowe. Jeżeli wejścia zwracają dane tego samego typu to na przykład.

Kopiuj
Flux<Integer> flux1 = Flux.range(0, 7);
Flux<Integer> flux2 = Flux.range(0, 5);
Flux<Integer> flux3 = Flux.range(0, 4);
Mono<Data> result = Flux.merge(flux1, flux2, flux3)
    .collectList()
    .map(Data::new);

Jeżeli są różne to np.

Kopiuj
Mono<List<Integer>> mono1 = Flux.range(0, 7).collectList();
Mono<List<String>> mono2 = Flux.just("a", "b", "c").collectList();
Mono<List<Boolean>> mono3 = Flux.just(true, false, false).collectList();
Mono<Data> result = Mono.zip(mono1, mono2, mono3)
    .map(this::dataFromTuple);

Zakładam, że strumienie nie są nieskończone.

Mam rozne typy, pseudo kod:

Kopiuj
public class ResponseDto {
    private Set<Person> people;

    private Set<Address> addresses;

    private Set<Integer> ids;

i robilem wlasnie zip i map ale jak zdebugowalem to widze cancel bo jakis stream sie wczesniej skonczyl.
no chyba, ze myle sie ze schedulerami.

  • Rejestracja: dni
  • Ostatnio: dni
0

ach ok, nie zauwazylem na telefonie tego collectList , sprawdze.
jak rozumiem to juz blokuje?

Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

Zależy co masz na myśli pisząc "blokuje". Wątków nie blokuje. Ale zip nie wyemituje niczego dopóki wszystkie trzy Mono się nie skończą.

  • Rejestracja: dni
  • Ostatnio: dni
0
Michał Sikora napisał(a):

Zależy co masz na myśli pisząc "blokuje". Wątków nie blokuje. Ale zip nie wyemituje niczego dopóki wszystkie trzy Mono się nie skończą.

Prawda. W moim przypadku i tak w któryms miejscu musze poczekac, wiec pod use case, który przedstawiłem jest to ok.
Moge tez zrobic subscribeOn(Schedulers...()) i to odpalic, to moze troche pomóc.

Myslalem jeszcze, ze da sie tu wykorzystac jakos backpressure czy cos.

Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

Ciężko mi sobie wyobrazić, żeby backpressure miało tutaj jakiekolwiek znaczenie. Jedyny punkt ścisku to zip, ale Ty i tak zipujesz tylko trzy listy jeden raz. collectList nie powinno być problematyczne, nawet jakbyś miał milion elementów na sekundę.

Ale jeżeli chciałbyś to zrównoleglić, to subscribeOn powinno być uwzględnione per Mono, a nie na całości po zipowaniu. Zależy w sumie od źródeł danych, ale jeśli pochodzą one ze zwykłych strumieni, jak pisałeś, to tak należałoby to zrównoleglić.

  • Rejestracja: dni
  • Ostatnio: dni
0
Michał Sikora napisał(a):

Ciężko mi sobie wyobrazić, żeby backpressure miało tutaj jakiekolwiek znaczenie. Jedyny punkt ścisku to zip, ale Ty i tak zipujesz tylko trzy listy jeden raz. collectList nie powinno być problematyczne, nawet jakbyś miał milion elementów na sekundę.

Ale jeżeli chciałbyś to zrównoleglić, to subscribeOn powinno być uwzględnione per Mono, a nie na całości po zipowaniu. Zależy w sumie od źródeł danych, ale jeśli pochodzą one ze zwykłych strumieni, jak pisałeś, to tak należałoby to zrównoleglić.

Tak tak, per Mono. Dzięki za pomóc. Coś tam mi się układa z tym w głowie. Ilość operatorów przyprawia o zawrót głowy.
Jeszcze w sumie ostatnie pytanie.
Ja jeszcze zanim to wszystko odpale, to na poczatku jeszcze odpalam calosc Flux.fromIterables(), no i wszystko teraz dziala ok, z tym, ze dostaje z tego zip osobna liste mono per element z tego Flux.fromiterables

cos w stylu:

Kopiuj
      Flux<Sex> s = Flux.fromIterable(Sex.keySet());
        return s.map(sex ->services.get(sex).getPeople(currency, ids)).subscribeOn(Schedulers.elastic());

tyle, że dostaje kilka Setów z People {} {} {} zamiast jednego {}

  • Rejestracja: dni
  • Ostatnio: dni
0

miala byc flatMapa

Kopiuj
Flux<Sex> s = Flux.fromIterable(Sex.keySet());
        return s.flatMap(sex ->services.get(sex).getPeople(currency, ids)).subscribeOn(Schedulers.elastic());
Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

Ummmm... trochę powróżę z kryształowej kuli. Zakładam, że services.get(sex).getPeople(currency, ids) zwraca Flux<Set<People>>. W takim razie, każda płeć jest mapowana na taki set lub kilka setów. Czyli kiedy zostanie wyemitowana jedna płeć, to zostanie wyemitowane ileś (od 0 do nieskończoności) setów ludzi dla tej płci. Większa ilość setów może wynikać albo z większej ilości płci niż 1, albo z większej ilości setów dla jednej płci, albo z obu powodów. Tak czy inaczej, ostatecznie musisz te dane zredukować. Możesz to zrobić np. tak.

Kopiuj
return Flux.fromIterable(Sex.keySet())
  .flatMap(sex ->services.get(sex).getPeople(currency, ids))
  .collect(LinkedHashSet::new, Set::addAll)
  .flux() // Jeżeli chcesz zwracać Flux zamiast Mono.
  .subscribeOn(Schedulers.elastic());
  • Rejestracja: dni
  • Ostatnio: dni
0

services.get(sex).getPeople(currency, ids) -> per każdy element jest zwracany Mono ;)

  • Rejestracja: dni
  • Ostatnio: dni
0

Czyli dostaje kilka Mono tego nizej a chcialbym zmergowane...

Kopiuj
public class ResponseDto {
    private Set<Person> people;
 
    private Set<Address> addresses;
 
    private Set<Integer> ids;
 


Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

services.get(sex).getPeople(currency, ids) zwraca Mono<ResponseDto>? Naprawdę nie rozumiem, co chcesz zrobić.

Wnioskuję, że to niżej zwraca Flux<Set<Person>>.

Kopiuj
return Flux.fromIterable(Sex.keySet())
  .flatMap(sex ->services.get(sex).getPeople(currency, ids))
  .subscribeOn(Schedulers.elastic());

W takim razie, to zwróci Mono<Set<Person>> z unikalnymi względem equals osobami.

Kopiuj
return Flux.fromIterable(Sex.keySet())
  .flatMap(sex ->services.get(sex).getPeople(currency, ids))
  .collect(LinkedHashSet::new, Set::addAll)
  .subscribeOn(Schedulers.elastic());
  • Rejestracja: dni
  • Ostatnio: dni
0

Przepraszam. Za duzo pseudokodu.

Kopiuj
return Flux.fromIterable(Sex.keySet())
  .flatMap(sex -> services.get(sex).getResponseDto(currency, ids))
  .collect( Collectors.toSet())
  .subscribeOn(Schedulers.elastic());

To zwroci Mono<Set<ResponseDto>.

Niezaleznie co robie to dostaje w srodku wiele list people, addresses itp.

Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

Ale to już Ty się ograniczasz swoim własnym typem ResponseDto. Musisz się zastanowić, jak zaimplementowałbyś funckję z mniej więcej taką sygnaturą ResponseDto combine(ResponseDto first, ResponseDto second) i jak jej użyc w collect. Aczkolwiek wydaje mi się to dosyć kiepskie i lepiej byłoby to zrobić np. na Tuple3 i wtedy mapować na ResponseDto.

  • Rejestracja: dni
  • Ostatnio: dni
0

Ogolnie jak kiepskie by to nie bylo to wydawalo mi sie, ze da sie zrobic ;) ale dzieki za sugestie :)

A to o czym mowisz to chyba moze mi dac po prostu zip? Tylko moglbym zlapac to troche pozniej zamiast od razu mapowac.

Michał Sikora
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: Kraków
  • Postów: 834
0

Użyć zip niby można, ale musiałbyś skorzystać z jednego z dwóch poniższych przeciążeń i z map zamiast flatMap. I funkcja łącząca kilka ResponseDto w jedno dalej musi być napisana.

Kopiuj
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources,
                              Function<? super Object[],? extends O> combinator)

@SafeVarargs
public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator,
                                             Publisher<? extends I>... sources)
  • Rejestracja: dni
  • Ostatnio: dni
0

ostatecznie skonczylem na koncu z reduce

Zarejestruj się i dołącz do największej społeczności programistów w Polsce.

Otrzymaj wsparcie, dziel się wiedzą i rozwijaj swoje umiejętności z najlepszymi.