Moje pytanie generalnie odnosi się do tematyki async/await, deadlocków i przetwarzania wielowątkowego, jednak żeby ktoś mi mógł pomóc muszę chyba zrobić małe wprowadzenie teoretyczne:
Pracuję obecnie nad programem, który służyłby (w uproszczeniu) do wymiany informacji w sieci rozproszonej. To znaczy program byłby uruchomiony na kilkudziesięciu urządzeniach i za jego pośrednictwem urządzenia przesyłałyby sobie nawzajem wiadomości.
Szukałem sposobu żeby móc automatycznie testować zachowanie takiej sieci (wymiana wiadomości wraz z symulacją opóźnień i zagubionych pakietów). Żeby łatwo móc symulować dużo urządzeń, postanowiłem utworzyć kilkanaście instancji głównej klasy programu (dajmy na to, że nazywa się Node
) oraz utworzyłem interfejs ITransmissionMedium
co pozwala mi zmockować warstwę sieciową:
public interface ITransmissionMedium
{
void SendMessage(MessageNetworkLayer message);
void HandleIncomingMessage(MessageNetworkLayer message);
}
Każdy Node
ma obiekt implementujący ITransmissionMedium
, który pozwala mu się komunikować z innymi instancjami klasy Node
. Docelowo funkcja SendMessage
wysyłałaby wiadomość w postaci pakietu TCP. Natomiast w symulacji za ITransmissionMedium
podstawiam klasę, która dorzuca wiadomość do globalnej tablicy, wraz z informacją kiedy ta wiadomość miałaby być dostarczona (żeby zasymulować opóźnienie).
Symulacja w uproszczeniu wygląda tak:
for (int t=0; t<MAX_TIME; t++)
{
// w globalnej tablicy znajdź wiadomości, które powinny być dostarczone w czasie t. Dla każdej z nich wywołaj funkcję HandleIncomingMessage w ITransmissionMedium docelowej instancji klasy Node
_messageBroker.ProcessMessagesForTime(t);
}
Czyli podsumowując: jeśli w czasie t Node1
wywoła SendMessage
adresując wiadomość do Node2
, to wiadomość zostanie dodana do tablicy z indeksem np. t+10
. Kiedy wyżej zaprezentowana pętla dojdzie do momentu t+10
to w Node2
zostanie wywołana funkcja HandleIncomingMessage
która tę wiadomość odbierze.
Docelowo jednak chcialem, żeby funkcja ITransmissionMedium.SendMessage
była asynchroniczna i zwracała od razu wiadomość zwrotną (ACK), zawierającą potwierdzenie, że druga strona odebrała wiadomość. AckStatus
to wiadomość potwierdzająca odebranie innej wiadomości, zawierająca informację czy druga strona przetworzyła ją bez błędów.
// tak chciałbym używać funkcji SendMessage
AckStatus wiadomosc_zwrotna_od_innego_urzadzenia = await SendMessage(message);
// obecna implementacja ITransmissionMedium:
public async Task<AckStatus> SendMessage(MessageNetworkLayer message)
{
// [...]
// Wysłanie wiadomości
// Dodanie wiadomości do kolejki oczekującej na ACK
TaskCompletionSource waitingForAckTask = new TaskCompletionSource();
connectionState.MessagesWaitingForAck.Add(connectionState.MessageCounter, waitingForAckTask);
// Zwrócenie wartości gdy przyjdzie ACK
return await waitingForAckTask.Task;
}
public async Task HandleIncomingMessage(MessageNetworkLayer message)
{
if (message.Type == MessageTypeNetLayer.DeliveryAck)
{
// przychodząca wiadomość to ACK
if (connectionState.MessagesWaitingForAck.TryGetValue(message.MessageCounter,
out var taskCompletionSource))
{
// odblokuj `waitingForAckTask` z funkcji `SendMessage`
taskCompletionSource.TrySetResult(new AckStatus { ... });
}
}
}
Problem polega na tym, że jeśli teraz uruchomię SendMessage, to wykonywanie kodu zatrzymuje się na await waitingForAckTask.Task;
, wygląda na to, że powstał deadlock. Przychodzi wam do głowy jak taki problem można rozwiązać? Generalnie wygląda na to, że każdy Node
powinien działać w swoim wątku, tylko jak te wątki zsynchronizaować z pętlą symulatora? Jak sprawdzić czy wszystkie wątki oczekują na coś, żeby przejść w symulacji do momentu t+1
?