WebFlux, RSocket i paginacja

WebFlux, RSocket i paginacja
KK
  • Rejestracja:ponad 16 lat
  • Ostatnio:3 dni
0

Cześć,

opiszę troszkę na okrętkę, bo problem może leży gdzieś indziej i ktoś podpowie :) Swoją aplikację zacząłem pisać w WebFluxie z RSocket (WebSocket) i całkiem spoko to szło, dopóki nie zacząłem korzystać z niej na telefonie. Każda blokada telefonu, zmiana sieci, słaby zasięg itp zrywa połączenie, a cholerstwo nie zawsze chce się automatycznie połączyć ponownie, albo łączy się bardzo długo. Walczyłem trochę z rsocket-js, ale ogólnie poddałem się i zacząłem przeklepywać kontrolery na RESTowe. Szło gładko dopóki nie trafiłem na dwa takie, które robią coś reaktywnie:

Kopiuj
    fun reactiveRecipeList(recipeListFilter: RecipeListFilterDto, userId: String): Flux<RecipeListItemDto> {
        return Flux.fromIterable(recipeService.items)
           ///....
            .flatMap { recipe ->
                Mono.just(recipeFullDetails(recipe, userId, recipeListFilter))
                    .subscribeOn(Schedulers.parallel())
            }
            .sort(RecipeListItemSortable().sort(recipeListFilter.sortType))
///...
    }

Kolekcja przepisów na razie <1k rekordów, ale w recipeFullDetails cisnę niemal całą logikę aplikacji. Z RSocket działało to spoko, bo wykonywało się raz, user dostawał 10 pierwszych rekordów we froncie, a jak scrollował niżej to dociągało 10 kolejnych - backpressure.
Próbując to w rescie poczyniłem coś takiego:

Kopiuj
data class PaginationRequest(val skip: Long = 0, val take: Long = 10)

    @PostMapping("/test")
    fun pagination(@RequestBody paginationRequest: PaginationRequest): Flux<String> {
        val intRange = IntRange(1, 100)
        val flatMapSequential = Flux.fromIterable(intRange)
            .flatMap { n ->
                Mono.just(n)
                    .map { i ->
                        log.info("changing $i...")
                        "resolve $i to string \n"
                    }
                    .subscribeOn(Schedulers.parallel())
            }
            .sort()
            .skip(paginationRequest.skip)
            .take(paginationRequest.take)
        return flatMapSequential
    }

Problem jest taki, że jak user pociągnie 10 rekordów a potem chce kolejne 10 to cała operacja się wykonuje na nowo. Co mogę zrobić?

  • pakować cały wynik userowi na front - odpada, bo duży i muli przeglądarkę
  • limitować dane wejściowe - odpada, bo sortuję wg najlepszego wyniku, a to wiem dopiero po wykoniu operacji w Mono
  • olać i zostawić tak dopóki nie ma problemu z wydajnością i rachunkiem za AWS
  • ?

Na razie zostaję przy opcji nr 3, ale ma ktoś jeszcze jakiś pomysł na taki przypadek?


Charles_Ray
Niezwiązane z tematem, ale wiesz, że Mono.just() jest blokujące? Świadomie wołasz w ten sposób recipeFullDetails? Nie blokujesz tam wątku?
KK
To co w Mono.just może się spokojnie w jednym wątku wykonywać, zależy mi tylko na równoległej obróbce elementów z kolekcji. Pod spodem nic nie blokuję
Charles_Ray
Nie wiem czy to działa tak jak myślisz, operacja wykonuje się zanim zrzucisz przetwarzanie na scheduler. Nie powinieneś użyć Mono.fromCallable? To jest częsty fakap z RX, który ciężko wychwycić z kodu. Sprawdzałeś?
KK
Hm sprawdzałem, elementy kolekcji obrabiane są wielowątkowo, czyli tak jak potrzebuję :)
anckor
  • Rejestracja:ponad 5 lat
  • Ostatnio:9 dni
  • Postów:308
1

A nie potrzebujesz tutaj przypadkiem hot publishera?

https://www.vinsguru.com/reactor-hot-publisher-vs-cold-publisher/

KK
  • Rejestracja:ponad 16 lat
  • Ostatnio:3 dni
0

@anckor: dzięki, ciekawy artykuł. W sumie nie chodzi mi o to, żeby jednego Publishera dzielić przez wielu subskrybentów, tylko żeby jeden subskrybent mógł kontrolować ile elementów jest w stanie przetworzyć (paginacja) i dociągać kolejne - czyli tzw. backpressure.

W RSocket jest to dostępne out of the box. W przypadku REST ciężko to ogarnąć, konsumpcja SSE, zakładanie jakiejś sesji. W sumie po to jest rsocket-js, żeby się w tym samemu nie babrać :)

Na razie odpuszczam temat. Załadowałem całego JSONa do frontu (330 KB) i zrobiłem paginację w JS. Lokalnie działa ok, zobaczymy jak później jak ten json urośnie.


Zarejestruj się i dołącz do największej społeczności programistów w Polsce.

Otrzymaj wsparcie, dziel się wiedzą i rozwijaj swoje umiejętności z najlepszymi.