Cześć, implementuję Rabbitowe RPC w kodzie, który nie ma asynchroniczności z poziomu kodu projekt sprzed C# 5.0. W przykładach rabbita jest wykorzystane BlockingCollection, które blokuje wywołanie do momentu wyciągnięcia interesującej nas wiadomości. Ogólnie by nie blokować głównego wątku, integracja będzie odpalana w osobnym zadaniu HF, natomiast spotkaliście się może z lepszym rowiązaniem niż BlockingCollection. Wcześniej nigdy nie korzystałem z takiej kolekcji, jesteście w stanie polecieć jakieś materiały, blogi jak ogarnąć powyższy przypadek, nic ciekawego nie znalazłem dla kodu bez async. https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take();
}
PipesPipes