Jaki system kolejkowy polecicie?
Jaki najwydajniejszy, niezawodny możliwy do tworzenia kopi kolejek na inne węzły. Dane będą w postaci binarnej. Musi być możliwość osadzenia w aplikacji (bez zewnętrznych narzędzi).
Co myślicie o ActiveMQ?
Jest tyle technologi kolejek na rynku że ciężko zdecydować się na jakieś konkretne rozwiązanie.
- Rejestracja:ponad 5 lat
- Ostatnio:ponad 4 lata
- Postów:43


- Rejestracja:ponad 6 lat
- Ostatnio:2 dni
- Lokalizacja:Silesia/Marki
- Postów:5505
Hm, jeśli ma być osadzony w aplikacji to ubija to możliwość użycia wielu niezawodnych rozwiązań jak Kafka

Terminal operations, such as Stream.forEach or IntStream.sum, may traverse the stream to produce a result or a side-effect. After the terminal operation is performed, the stream pipeline is considered consumed, and can no longer be used; if you need to traverse the same data source again, you must return to the data source to get a new stream.
Czy Stream z Javy 8 to strumień?

Jaki najwydajniejszy, niezawodny możliwy do tworzenia kopi kolejek na inne węzły
. Czy da się stworzyć kopię takiej typowej kolejki (a nie czegoś co może robić za kolejkę w szczególnym przypadku)?


- Rejestracja:około 17 lat
- Ostatnio:około godziny
- Postów:1873
Jaka skala ruchu? Co to znaczy "najwydajniejszy"?
- Rejestracja:prawie 10 lat
- Ostatnio:około 3 godziny
- Postów:2368
volau napisał(a):
system będzie obrabiał około 20 milionów wiadomości dziennie
To, że system będzie obrabiał ileś czegoś, to znaczy nic.
- Batchowo, online?
- Jaki jest max. czas na obsłużenie wiadomości?
- Ile tych wiadomości w szczycie będziesz miał na sekundę?
- Jaki jest max. czas oczekiwania na obsłużenie wiadomości?
- Wiadomości są trwałe, ulotne?
- Możesz coś zgubić?
- Jaki jest rozmiar wiadomości?

- Rejestracja:ponad 21 lat
- Ostatnio:około godziny
- Lokalizacja:Kraków
- Postów:1114
Z tymi większymi rozmiarem wiadomościami (liczonymi w MB) to bym uważał. Jak payload jest większy to często się go wrzuca na jakiś storage, a w wiadomości wrzucanej na kolejkę jest tylko link do payloada na storage, skąd odbiorca wiadmości może sobie go pobrać.
- Rejestracja:ponad 5 lat
- Ostatnio:ponad 4 lata
- Postów:43
Robię próbę z ActiveMQ. Mam uruchomionego brokera jako embedded z persistence na true. Mam około 20 kolejek. I tworzę połączenie i sesje per kolejke. Wysyłam na kolejki BytesMessage. Mam problem w wydjanością a dokłądnie spędza średnio 10-50milisekund na metodzie messageProducer.send(message). Jakies pomysł co może byc powodem?
Klasa obsługująca kolejkę (na razie taka forma bo robimy wydmuszke aby sprawdzic czy jest lepsze od obecnie uzytej technologi (kod jeszcze do refactoringu))
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
public class ActiveMQQueue {
private static final Logger log = LoggerFactory.getLogger(ActiveMQQueue.class);
private Connection connection;
private Session session;
private String queueName;
private Destination destination;
private MessageProducer messageProducer;
private MessageConsumer messageConsumer;
public ActiveMQQueue(String queueName) throws JMSException {
this.queueName = queueName;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(queueName);
messageProducer = session.createProducer(destination);
messageConsumer = session.createConsumer(destination);
log.info("CREATED QUEUE FOR: " + queueName);
}
private byte[] readBody(BytesMessage message) throws JMSException {
if(message != null) {
byte[] data = new byte[(int) message.getBodyLength()];
message.readBytes(data);
return data;
}
return null;
}
public byte[] peek() throws JMSException {
long start = System.currentTimeMillis();
BytesMessage message = (BytesMessage) messageConsumer.receiveNoWait();
long time = (System.currentTimeMillis()-start);
if(time > 5) log.info("PEEK: {}", time);
return readBody(message);
}
public byte[] pop() throws JMSException {
long start = System.currentTimeMillis();
BytesMessage message = (BytesMessage) messageConsumer.receiveNoWait();
if(message != null) {
message.acknowledge();
}
long time = (System.currentTimeMillis()-start);
if(time > 5) log.info("POP: {}", time);
return readBody(message);
}
public void push(byte[] data) throws JMSException {
BytesMessage message = session.createBytesMessage();
message.writeBytes(data);
long start = System.currentTimeMillis();
messageProducer.send(message);
long time = (System.currentTimeMillis()-start);
if(time > 5) log.info("PUSH: {}",time);
}
public String getName() {
return queueName;
}
public Destination getDestination() {
return destination;
}
public int size() {
return 0; //TODO
}
}

CachingConnectingFactory
, z tego co kojarze w ActiveMQ domyslnie jest tworzone nowe polaczenie na kazda wiadomosc, powinno miec dosyc znaczacy wplyw na wydajnosc. Wiem ze spring podrzucal taka implementacje dla ActiveMQ
