Spring Webflux - REST i wyścigi wątków

Spring Webflux - REST i wyścigi wątków
KK
  • Rejestracja:prawie 17 lat
  • Ostatnio:14 dni
0

Po przeczytaniu sąsiedniego wątku, napisałem sobie prostą aplikacyjkę w webfluxie z licznikiem, aby sprawdzić kolejność wykonywania wątków.

Counter.java:

Kopiuj
public class Counter {

    public final long counter;

    public Counter(long counter) {
        this.counter = counter;
    }

    public Counter updateCounter() {
        return new Counter(this.counter + 1);
    }
}

CounterController.java

Kopiuj
@Configuration
public class CounterController {

    private final CounterService counterService;

    public CounterController() {
        this.counterService = new CounterService(new Counter(0));
    }

    @Bean
    RouterFunction<ServerResponse> counterRoutes() {
        return nest(path("/api"),
                route(GET("/counter"), counterValue())
        .andRoute(PATCH("/counter"), increaseCounterValue()));
    }

    private HandlerFunction<ServerResponse> counterValue() {
        return request -> {
            final long counterValue = this.counterService.getCounterValue();
            return ServerResponse.ok().body(fromObject(counterValue));
        };
    }

    private HandlerFunction<ServerResponse> increaseCounterValue() {
        return request -> {
            final long counterValue = this.counterService.increaseCounterValue();
            return ServerResponse.ok().body(fromObject(counterValue));
        };
    }
}

CounterService.java

Kopiuj
public class CounterService {

    private Counter counter;

    public CounterService(Counter counter) {
        this.counter = counter;
    }

    public long getCounterValue() {
        System.out.println("pobieram countera simple: " + this.counter.counter);
        return this.counter.counter;
    }

    public long increaseCounterValue() {
        this.counter = this.counter.updateCounter();
        System.out.println("zaktualizowalem countera simple: " + this.counter.counter);
        return this.counter.counter;
    }
}

Następnie puściłem JMeterem 100 wątków x 100 requestów. W konsoli ostatni wpis:
zaktualizowalem countera simple: 8749

Zmieniając na public synchronized long increaseCounterValue() {...} w CounterService.java, ostatni wpis w konosli:
zaktualizowalem countera synchronized: 10000

Zmieniając long na AtomicLong w Counter.java (i oczywiście bez synchronized w serwisie)
zaktualizowalem countera atomic: 10000

Robiąc tak jak w pierwszym przykładzie, tylko definiując counter w CounterService.java private volatile Counter counter;
zaktualizowalem countera volatile: 8873

Pytanie

W jaki sposób poprawnie użyć volatile?
W tym temacie Dostarczanie danych przez REST i ich aktualizacja @jarekr000000 mnie zbił z tropu stwierdzeniem, że synchronized jest niewystarczające. W powyższym przykładzie wygląda OK, a nie bardzo wiem czym się różni od kodu @eL ?

PS. Bez sensu podawać czasy benchmarków dla wszystkich 4 przypadków (bez warmupów, kupe innych rzeczy w tle odpalonych itp), ale jak kogoś bardzo to interesuje to wszystkie wykonywały się niemal identycznie na poziomie:
GET (sr, min, max): 16, 0, 32 ms i PATCH(sr, min, max): 1, 0, 9 ms


Michał Sikora
Michał Sikora
  • Rejestracja:ponad 7 lat
  • Ostatnio:prawie 4 lata
  • Lokalizacja:Kraków
  • Postów:834
1

W Twoim przykładzie nowa wartość zależy od poprzedniej. volatile z czymś takim sobie nie radzi.

PU
  • Rejestracja:ponad 9 lat
  • Ostatnio:5 miesięcy
  • Postów:59
0

volatile zapewnia nam widoczność zmiany między wątkami (odczyt bezpośrednio z pamięci, a nie z cache procesora). Dodatkowo jest to informacja do tego, aby JIT nie próbował optymalizować tej zmiennej (np zauważa, że warunek pętli jest stały więc wrzuca stałą, zamiast czytać zmienną - nie jest świadomy tego, że inny wątek tę zmienną może zmienić).

PU
Oczywiście pewnie to nie wszystko, piszę z głowy :)
jarekr000000
  • Rejestracja:ponad 8 lat
  • Ostatnio:około 13 godzin
  • Lokalizacja:U krasnoludów - pod górą
  • Postów:4708
0
pustypawel napisał(a):

volatile zapewnia nam widoczność zmiany między wątkami (odczyt bezpośrednio z pamięci, a nie z cache procesora).

Z tym odczytem bezpośrednio z pamięci - to tak jest w C++ (i ma to czasem znaczenie, np. przy czytaniu IO).
W javie to akurat nie ma sensu. Technicznie być może JVM nawet wymuszać odczyt z pamięci, ale to jest szczegół implementacyjny.
Druga część OK.


jeden i pół terabajta powinno wystarczyć każdemu
PU
Jak nie odczyt z pamięci (ram), to jak byś inaczej to opisał? Nie do końca rozumiem też dlaczego w javie taki odczyt nie ma sensu, możesz rozwinąć?
jarekr000000
Z punktu widzenia JVM nie ma znaczenia czy w RAM czy gdziekolwiek. JVM po prostu gwarantuje odczytanie zmiany przez inne wątki. W szczególnośći JVM jesli wykryje, że masz tylko jeden wątek, który odwołuje się do zmiennej to może spokojnie nie odczytywać z pamięci (nie wiem czy jest taka optymalizacja gdzieś). Ale w C++ faktycznie jest gwarancja czytania z RAM. W JVM to nie ma sensu, bo nie mamy kontroli nad allokacją/ wskaźnikami (przy normalnej stercie). Faktyczna lokalizacja odczytywanej komórki może się zmieniać (w skutek działania GC).
PU
Rozumiem - JVM mówi "volatile zapewnia widoczność zmiany", a że robi to (o ile dobrze pamiętam) między innymi przez bezpośredni odczyt z pamięci to szczegół implementacyjny :)
KR
Na x86/amd64 nie robi przez bezpośredni odczyt z pamięci. Byłoby to bardzo drogie. Używa instrukcji *fence. Zapis wymusza jedynie opróżnienie rejestrów (kolejek) zapisu i wymusza przez to że dane trafia do cache i nastąpi inwalidacja tej linii cache innych rdzeni. A odczyt idzie chyba nawet normalnie jak zwykły odczyt, tyle że zakazane jest out-of-order execution. Dlatego dopóki volatile robisz z jednego wątku, to nie kosztuje prawie nic względem nie-volatile. W ogóle x86 sam dba o koherencję cache, więc niewiele trzeba.
SL
  • Rejestracja:około 7 lat
  • Ostatnio:około 12 godzin
  • Postów:900
0

volatile nadaję się tylko do podstawowych operacji tj., odczyt i zapis pamięci. Jeśli chcesz użyć bardziej zaawansowanych operacji, to zostają klasy Atomic*. Co do działania volatile/ogólnie atomiców, to trzeba pamiętać, że sam odczyt z pamięci nie wystarczy. Przykładowo przy zapisie do pamięci wymagana jest synchronizacja: https://gcc.godbolt.org/z/rSsS2R tutaj fajnie widać instrukcję mfence . Dodatkowo wymagane jest odpowiednie zachowanie interpretera/jita: zakazane są zmiany kolejności instrukcji, które mogłyby zmienić kolejność widoczności kolejnych zmian w innych wątkach, co nie jest prawdą w przypadku normalnych zmiennych.

edytowany 1x, ostatnio: slsy
KR
Wszystko ok, tylko ani mfence, ani mov nie wymuszają zapisu ani odczytu z pamięci głównej.
KK
  • Rejestracja:prawie 17 lat
  • Ostatnio:14 dni
0

OK dzięki. Rozumiem, że volatile nie pomoże, jeśli nowa wartość zależy od poprzedniej. Zadziała, jeśli tylko jeden wątek zmienia wartość, a drugi wątek tylko odczytuje (nie zmienia). W kontekście Spirng Webflux nie mam pojęcia jak to zapewnić.

Zmieniłem kod z "auto countera" na taki, który POSTem ustawia licznik, GETem go zwraca:

W Controllerj.java doałem

Kopiuj
    private HandlerFunction<ServerResponse> setCounterValue() {
        return request -> {
            final Mono<String> longMono = request.bodyToMono(String.class);
            return longMono
                    .map(stringValue -> Long.parseLong(stringValue))
                    .flatMap(longValue -> {
                        final long counterValue = this.counterService.setCounterValue(longValue);
                        return ServerResponse.ok().body(fromObject(counterValue));
                    });
        };
    }

CounterService zmieniłem na:

Kopiuj
public class CounterService {

    private volatile Counter counter;

    public CounterService(Counter counter) {
        this.counter = counter;
    }

    public long getCounterValue() {
        System.out.println("pobieram countera simple post volatile: " + this.counter.counter);
        return this.counter.counter;
    }

    public long setCounterValue(long newValue) {
        this.counter = this.counter.updateCounter(newValue);
        System.out.println("zaktualizowalem countera simple post volatile: " + this.counter.counter);
        return this.counter.counter;
    }
}

A klasa Counter.java

Kopiuj
public class Counter {

    public final long counter;

    public Counter(long counter) {
        this.counter = counter;
    }

    public Counter updateCounter(long i) {
        return new Counter(i);
    }
}

Puściłem JMeterem 100 wątków x 100 requestów na przemian POST z GETem, gdzie POST ustawia randomową liczbę. Do tego napisałem sobie analizatora logów:

  1. Bez volatile, synchronized itp:
    Łączna ilość znalezionych nieprawidłowych danych: 224

  2. synchronized na samego GETa lub POSTa nie wystarcza, ustawione na obydwóch metodach:
    Łączna ilość znalezionych nieprawidłowych danych: 0

  3. Zamiana w Counter.java long na AtomicLong oraz metody update:

Kopiuj
    public Counter updateCounter(long i) {
        counter.set(i);
        return this;
    }

Łączna ilość znalezionych nieprawidłowych danych: 287 (!?)

  1. Z volatile tak jak wklejony kod:
    Łączna ilość znalezionych nieprawidłowych danych: 217

Pytanie

  1. Jak poprawnie użyć volatile?
  2. Czemu z AtomicLong też mi się krzaczy? :D

Za nieprawidłowe dane uznaję coś takiego w konsoli:

Kopiuj
zaktualizowalem countera simple post volatile: 282
zaktualizowalem countera simple post volatile: 3249
pobieram countera simple post volatile: 282
pobieram countera simple post volatile: 282

KR
  • Rejestracja:około 6 lat
  • Ostatnio:około 6 lat
  • Postów:6
0

Z atomic longiem, jeśli dobrze widzę, robisz dwie operacje, pobranie i ustawienie wartości, nie ma gwarancji, że one się razem wykonają w jednym wątku. Musiałbyś użyć którejś z atomowych operacji.

PU
  • Rejestracja:ponad 9 lat
  • Ostatnio:5 miesięcy
  • Postów:59
0
Kopiuj
        this.counter = this.counter.updateCounter(newValue);
        System.out.println("zaktualizowalem countera simple post volatile: " + this.counter.counter);

Przy wypisywaniu powinieneś użyć newValue - wartość countera może już być inna - równolegle zmodyfikowana.
To samo przy pobieraniu - może zdarzyć się, że na konsolę wypiszesz jedną liczbę, a zwrócisz inną

edytowany 1x, ostatnio: pustypawel
KK
  • Rejestracja:prawie 17 lat
  • Ostatnio:14 dni
0

@pustypawel: niestety, to też nie pomogło.

Wpakowałem się w temat wątków, o którym nie mam pojęcia, i teraz się męczę :D

Sprawdzanie wartości wpisanych w konsolę od początku wydawało mi się śmieszne i głupie. Pomyślałem, że skoro strzelam requestami z JMetera to tam powinienem sprawdzać i porównywać odpowiedzi. Dodałem więc Response Assertion i Assertion Results, porównałem wartości z odpowiedzi i hmm.. zawsze dostaję faile, nawet jak cały kod mam w synchronized. Pewnie coś źle sprawdzam, nie wiem.

Zamiast czepiać się Webfluxa pomyślałem, że jeśli CounterService będzie napisany dobrze to nie będzie miało to znaczenia, czy użyję go w Webfluxie czy w innym g^&nie, dlatego napisałem taki unit test:

Kopiuj
@Test
    public void multipleThreadSetAndGetShouldReturnTheSameValue() throws ExecutionException, InterruptedException {
        int threads = 10;
        final Counter counter = new Counter(0);
        final CounterService counterService = new CounterService(counter);
        CountDownLatch latch = new CountDownLatch(10);
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        Collection<Future<String>> results = new ArrayList<>();
        AtomicLong failCounter = new AtomicLong(0);
        final Random random = new Random();

        for (int i = 0; i < threads; i++) {
            final long randomLong = random.nextLong();
            results.add(executorService.submit(() -> {
                latch.await(1, TimeUnit.SECONDS);
                latch.countDown();
                return checkCounterValue(counterService, randomLong);
            }));
        }

        for (Future<String> result : results) {
            if (result.get().startsWith("FAIL")) failCounter.incrementAndGet();
        }

        assertEquals(0, failCounter.get());

    }

    private String checkCounterValue(CounterService counterService, long randomLong) {
        counterService.setCounterValue(randomLong);
        final long counterValue = counterService.getCounterValue();
        if (counterValue != randomLong)
            return "FAIL = should be " + randomLong + ", actual " + counterValue + " w watku " + Thread.currentThread().getName();
        return "ok";
    }

Niestety, ten test raz przechodzi raz nie :) Czasem nawet wywali się jak w CounterService mam wszystkie metody synchronized(). Wklejam kod Counter i CounterService, czysta java, może ktoś będzie miał chwilę i podpowie jak poprawić ten test.

Kopiuj
public class Counter {

    public  long counter;

    public Counter(long counter) {
        this.counter = counter;
    }

    public Counter updateCounter(long i) {
        return new Counter(i);
    }
}

public class CounterService {

    private Counter counter;

    public  CounterService(Counter counter) {
        this.counter = counter;
    }

    public synchronized long getCounterValue() {
        return this.counter.counter;
    }

    public synchronized long setCounterValue(long newValue) {
        this.counter = this.counter.updateCounter(newValue);
        return this.counter.counter;
    }
}

PU
  • Rejestracja:ponad 9 lat
  • Ostatnio:5 miesięcy
  • Postów:59
1

I znowu:

Kopiuj
        counterService.setCounterValue(randomLong);
        final long counterValue = counterService.getCounterValue();

Między tymi wywołaniami inny wątek może wartość zmienić, dlatego nie ma pewności, że będą takie same :) Może inaczej - co chcesz uzyskać, bo teraz to już w ogóle nie rozumiem jaki jest cel tego testu?

Kopiuj
        if (counterValue != randomLong)
            return "FAIL = should be " + randomLong + ", actual " + counterValue + " w watku " + Thread.currentThread().getName();

Ten fail jest zły - to jest normalna sytuacja, że inny wątek mógł wartość zmodyfikować, co więcej mamy 100% pewności że jeżeli zmodyfikował to ta zmiana będzie widoczna w naszym wątku (bo mamy synchronized)

KK
  • Rejestracja:prawie 17 lat
  • Ostatnio:14 dni
0

Masz rację. To jak napisać test, który sprawdzi, że pobrana wartość jest na pewno aktualna?

Czyli sytuacja taka jak opisujesz. Kilka wątków zmienia wartość, ale pobrana wartość jest zawsze aktualna. Taki test da się w ogóle napisać? :)


KK
  • Rejestracja:prawie 17 lat
  • Ostatnio:14 dni
0
Kopiuj
@Test
    public void multipleThreadSetAndGetShouldReturnTheSameValue() throws ExecutionException, InterruptedException {
        int threads = 10;
        final Counter counter = new Counter(0);
        final CounterService counterService = new CounterService(counter);
        CountDownLatch latch = new CountDownLatch(threads);
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        Collection<Future<Long>> results = new ArrayList<>();
        AtomicLong sequence = new AtomicLong(0);

        for (int i = 0; i < threads; i++) {
            results.add(executorService.submit(() -> {
                latch.await(1, TimeUnit.SECONDS);
                latch.countDown();
                counterService.setCounterValue(sequence.getAndIncrement());
                return counterService.getCounterValue();
            }));
        }

        final Set<Long> uniqueResult = new HashSet<>();
        for (Future<Long> result : results) {
            uniqueResult.add(result.get());

        }

        assertEquals(threads, uniqueResult.size());
    }

Ten test już jest bliższy temu co chcę uzyskać.

Czy da się napisać taki test, który w 100/100 przypadków się wywali, jeśli zdejmę z metod synchronized w CounterService oraz 100/100 przypadków przejdzie, jeśli dodam synchronized do tych metod?


PU
  • Rejestracja:ponad 9 lat
  • Ostatnio:5 miesięcy
  • Postów:59
0

100/100 przypadków się wywali, jeśli zdejmę z metod synchronized

Otóż nie :) volatile/synchronized dają gwarancję widoczności zmiany (synchronized daje jeszcze blokadę), co nie znaczy że bez nich zmiana zawsze będzie niewidoczna (w szczególnym przypadku, gdy masz jeden procesor z jednym wątkiem to prawdopodobnie, volatile w ogóle nie potrzebujesz)

jarekr000000
  • Rejestracja:ponad 8 lat
  • Ostatnio:około 13 godzin
  • Lokalizacja:U krasnoludów - pod górą
  • Postów:4708
0
pustypawel napisał(a):

(w szczególnym przypadku, gdy masz jeden procesor z jednym wątkiem to prawdopodobnie, volatile w ogóle nie potrzebujesz)

Otóż nie. Nadal potrzebujesz. Możesz mieć więcej szcześcia, problem nie będzie często widoczny, ale java to nie c++. JVM to świnia.


jeden i pół terabajta powinno wystarczyć każdemu
edytowany 1x, ostatnio: jarekr000000
PU
Fakt, zapomniałem o JITcie - dlatego profilaktycznie użyłem słowa "prawdopodobnie" :D
KK
  • Rejestracja:prawie 17 lat
  • Ostatnio:14 dni
0

No dobra, skoro nie mogę napisać testu, który 100/100 się wywali bez synchronized, to jak napisać test który 100/100 przejdzie z synchronized? Ten powyższy sporadycznie się wywala, mimo że kod CounterService jest prawidłowy. Chodzi o przypadki, kiedy wątki kilka razy pod rząd ustawią wartość.

Chodzi mi o test, który sprawdzi, że aktualnie pobrana wartość jest na pewno ostatnią ustawioną przez dowolny inny wątek.

Próbowałem jeszcze testy new Thread() i Thread.sleep(). Jeden wątek pobiera wartość i zasypia, drugi zmienia wartość, pierwszy wstaje i znowu pobiera i sprawdza czy jest nowa. Oczywiśćie to też mi nie działa :D


Zarejestruj się i dołącz do największej społeczności programistów w Polsce.

Otrzymaj wsparcie, dziel się wiedzą i rozwijaj swoje umiejętności z najlepszymi.