Jaki system kolejkowy

Jaki system kolejkowy
VO
  • Rejestracja:ponad 5 lat
  • Ostatnio:ponad 4 lata
  • Postów:43
0

Jaki system kolejkowy polecicie?
Jaki najwydajniejszy, niezawodny możliwy do tworzenia kopi kolejek na inne węzły. Dane będą w postaci binarnej. Musi być możliwość osadzenia w aplikacji (bez zewnętrznych narzędzi).
Co myślicie o ActiveMQ?
Jest tyle technologi kolejek na rynku że ciężko zdecydować się na jakieś konkretne rozwiązanie.

edytowany 2x, ostatnio: volau
Anna Lisik
A nie lepierj użyć wbudowanego w powiedzmy GL?
KamilAdam
  • Rejestracja:ponad 6 lat
  • Ostatnio:5 dni
  • Lokalizacja:Silesia/Marki
  • Postów:5505
0

Hm, jeśli ma być osadzony w aplikacji to ubija to możliwość użycia wielu niezawodnych rozwiązań jak Kafka


Mama called me disappointment, Papa called me fat
Każdego eksperta można zastąpić backendowcem który ma się douczyć po godzinach. Tak zostałem ekspertem AI, Neo4j i Nest.js . Przez mianowanie
Zobacz pozostałe 8 komentarzy
Wibowit
Kiedy coś jest strumieniem? Strumień to takie słowo, że trudno z niego cokolwiek wywnioskować. Dla przykładu https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps Terminal operations, such as Stream.forEach or IntStream.sum, may traverse the stream to produce a result or a side-effect. After the terminal operation is performed, the stream pipeline is considered consumed, and can no longer be used; if you need to traverse the same data source again, you must return to the data source to get a new stream. Czy Stream z Javy 8 to strumień?
Shalom
To filozoficzna dyskusja i trochę jednak nie na temat ;) Ważne jest że Kafka to nie jest kolejka, chociaż pewnie jak sie ktoś uprze to może z niej w ten sposób skorzystać.
Wibowit
Poza tym autor napisał: Jaki najwydajniejszy, niezawodny możliwy do tworzenia kopi kolejek na inne węzły. Czy da się stworzyć kopię takiej typowej kolejki (a nie czegoś co może robić za kolejkę w szczególnym przypadku)?
KamilAdam
Zgadzam się z @Wibowit. W programowaniu funkcyjnym Steam to leniwa kolekcja
Charles_Ray
Dyskusja w sumie akademicka, ja za Stream uważam potencjalnie nieskończoną kolekcję, której elementy można odczytać tylko 1 raz. Jak macie inną definicję, to spoko, nie mam z tym żadnego problemu. "Apache Kafka® is a distributed streaming platform." to chwyt marketingowy, teraz każdy chce być "distributed" i "streaming", bo batch przestał być modny itp itd :)
Charles_Ray
  • Rejestracja:około 17 lat
  • Ostatnio:około 9 godzin
  • Postów:1880
0

Jaka skala ruchu? Co to znaczy "najwydajniejszy"?


”Engineering is easy. People are hard.” Bill Coughran
VO
  • Rejestracja:ponad 5 lat
  • Ostatnio:ponad 4 lata
  • Postów:43
0

system będzie obrabiał około 20 milionów wiadomości dziennie

YA
  • Rejestracja:około 10 lat
  • Ostatnio:około 7 godzin
  • Postów:2370
2
volau napisał(a):

system będzie obrabiał około 20 milionów wiadomości dziennie

To, że system będzie obrabiał ileś czegoś, to znaczy nic.

  • Batchowo, online?
  • Jaki jest max. czas na obsłużenie wiadomości?
  • Ile tych wiadomości w szczycie będziesz miał na sekundę?
  • Jaki jest max. czas oczekiwania na obsłużenie wiadomości?
  • Wiadomości są trwałe, ulotne?
  • Możesz coś zgubić?
  • Jaki jest rozmiar wiadomości?
TurkucPodjadek
TurkucPodjadek
Ja bym jeszcze dorzucił, czy nacisk jest na szybkość odczytu/zapisu wiadomości + czy np. potencjalny odbiorca wiadomości ma być live, czy może go "chwilowo" nie być po drugiej stronie. Pod resztą Twoich pytań się podpisuję.
Wibowit
  • Rejestracja:około 20 lat
  • Ostatnio:około 3 godziny
2

Jaki najwydajniejszy, niezawodny możliwy do tworzenia kopi kolejek na inne węzły.

Co to znaczy stworzyć kopię kolejki? Kopię metadanych, danych (historycznych/ nieprzetworzonych/ wszystkich), czegoś innego?


"Programs must be written for people to read, and only incidentally for machines to execute." - Abelson & Sussman, SICP, preface to the first edition
"Ci, co najbardziej pragną planować życie społeczne, gdyby im na to pozwolić, staliby się w najwyższym stopniu niebezpieczni i nietolerancyjni wobec planów życiowych innych ludzi. Często, tchnącego dobrocią i oddanego jakiejś sprawie idealistę, dzieli od fanatyka tylko mały krok."
Demokracja jest fajna, dopóki wygrywa twoja ulubiona partia.
neves
  • Rejestracja:prawie 22 lata
  • Ostatnio:około 12 godzin
  • Lokalizacja:Kraków
  • Postów:1114
2

Czyli masz +/- 250 na sekundę, z taką ilością to sobie każda kolejka powinna poradzić. ActiveMQ tak spokojnie po kilkanaście tysięcy na sekundę da radę, a Kaffka nawet i kilkaset tysięcy na sekundę.


VO
  • Rejestracja:ponad 5 lat
  • Ostatnio:ponad 4 lata
  • Postów:43
0

W najwyższym chwilowym ruchu może dojść do 2000/s ale to rzadko. Wielkość danych może oscylować od kilku kb do max 5mb (te większe dużo rzadziej). Jeśli ActiveMQ ma taką wydajność to prawdopodobnie padnie wybór na tą technologie.

neves
  • Rejestracja:prawie 22 lata
  • Ostatnio:około 12 godzin
  • Lokalizacja:Kraków
  • Postów:1114
2

Z tymi większymi rozmiarem wiadomościami (liczonymi w MB) to bym uważał. Jak payload jest większy to często się go wrzuca na jakiś storage, a w wiadomości wrzucanej na kolejkę jest tylko link do payloada na storage, skąd odbiorca wiadmości może sobie go pobrać.


edytowany 1x, ostatnio: neves
VO
  • Rejestracja:ponad 5 lat
  • Ostatnio:ponad 4 lata
  • Postów:43
0

Robię próbę z ActiveMQ. Mam uruchomionego brokera jako embedded z persistence na true. Mam około 20 kolejek. I tworzę połączenie i sesje per kolejke. Wysyłam na kolejki BytesMessage. Mam problem w wydjanością a dokłądnie spędza średnio 10-50milisekund na metodzie messageProducer.send(message). Jakies pomysł co może byc powodem?

Klasa obsługująca kolejkę (na razie taka forma bo robimy wydmuszke aby sprawdzic czy jest lepsze od obecnie uzytej technologi (kod jeszcze do refactoringu))

Kopiuj
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class ActiveMQQueue {
    private static final Logger log = LoggerFactory.getLogger(ActiveMQQueue.class);
    private Connection connection;
    private Session session;

    private String queueName;
    private Destination destination;
    private MessageProducer messageProducer;
    private MessageConsumer messageConsumer;
    

    public ActiveMQQueue(String queueName) throws JMSException {
            this.queueName = queueName;
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue(queueName);
            messageProducer = session.createProducer(destination);
            messageConsumer = session.createConsumer(destination);
            log.info("CREATED QUEUE FOR: " + queueName);
    }

    private byte[] readBody(BytesMessage message)  throws JMSException {
        if(message != null) {
            byte[] data = new byte[(int) message.getBodyLength()];
            message.readBytes(data);
            return data;
        }

        return null;
    }

    public byte[] peek() throws JMSException {
        long start = System.currentTimeMillis();
        BytesMessage message = (BytesMessage) messageConsumer.receiveNoWait();
        long time =  (System.currentTimeMillis()-start);
        if(time > 5) log.info("PEEK: {}", time);
        return readBody(message);
    }

    public byte[] pop() throws JMSException {
        long start = System.currentTimeMillis();
        BytesMessage message = (BytesMessage) messageConsumer.receiveNoWait();

        if(message != null) {
            message.acknowledge();
        }
        long time =  (System.currentTimeMillis()-start);
        if(time > 5) log.info("POP: {}", time);
        return readBody(message);
    }

    public void push(byte[] data) throws JMSException {

        BytesMessage message = session.createBytesMessage();
        message.writeBytes(data);
        long start = System.currentTimeMillis();
        messageProducer.send(message);
        long time =  (System.currentTimeMillis()-start);
        if(time > 5) log.info("PUSH: {}",time);
    }

    public String getName() {
        return queueName;
    }

    public Destination getDestination() {
        return destination;
    }

    public int size() {
        return 0; //TODO
    }

}

edytowany 3x, ostatnio: volau
Zobacz pozostałe 20 komentarzy
YA
Hmm, do tej kolejki to Producenta i Konsumenta na jednym połączeniu masz? Próbowałeś producenta i konsumenta jakoś osobno inicjalizować? Nie powinny to być osobne sesje?
VO
Próbowałem dalej to samo :(
pedegie
ustaw sobie po stronie producenta CachingConnectingFactory, z tego co kojarze w ActiveMQ domyslnie jest tworzone nowe polaczenie na kazda wiadomosc, powinno miec dosyc znaczacy wplyw na wydajnosc. Wiem ze spring podrzucal taka implementacje dla ActiveMQ
vpiotr
Spróbuj użyć nanoTime. CurrentTimeMillis może zwracać wyniki z dokładnością do dziesiątek milisekund (np. co 10 ms). Poza tym liczysz tu czas pojedynczego wywołania jeśli jest odpowiednio długie, nie masz rozgrzewki, nie widać ile razy to robisz i jaki jest rozmiar wiadomości. Policz min, max, stddev, odrzuć ekstrema. Przeczytaj https://stackoverflow.com/a/513259
VO
czasy po rozgrzewce. zmiana na producer.setUseAsyncSend(true) pomogła, plus ustawienie prefetch na 1000

Zarejestruj się i dołącz do największej społeczności programistów w Polsce.

Otrzymaj wsparcie, dziel się wiedzą i rozwijaj swoje umiejętności z najlepszymi.