Hej
Przygotowuję serwis, który ma nasłuchiwać na eventy z Kafki. Dla każdego z Kafka topików uruchamiam listenera w osobnym wątku. W momencie otrzymania eventu, Kafka listener ma wywołać REST. Ponieważ REST może chwilę trwać, nie chcę blokować listenera tylko kontynuować obsługę kolejnych eventów (odpowiedź na RESTa również nie bardzo mnie interesuje - co by ją tylko wyprintować). Wydaje się to znakomitym usecasem dla Asyncio. Mój kod wygląda następująco:
W 'main' pobieram referencje do event_loop i przekazuję ją do każdego z Kafka listenerów
def run_consumer(topic, event_mapping, loop, kafka_config):
threading.Thread(target=MyKafkaConsumer(
topic=topic,
event_mapping=event_mapping,
loop=loop,
**kafka_config
).consume).start()
if __name__ == '__main__':
config = Config()
loop = asyncio.get_event_loop()
[run_consumer(topic, event_mapping, loop, config.get_kafka()) for topic, event_mapping in config.get('mappings').items()]
loop.run_forever()
Następnie, w kodzie listenera czekam na eventy z Kafki, i w momencie otrzymania eventu, wrzucam go na event_loop (fragmenty kodu, żeby przedstawić ideę):
async def asynchronous_fetch(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
class MyKafkaConsumer:
def __init__(self, bootstrap_servers, schema_registry_url, group_id, topic, event_mapping, loop, **kwargs):
self.loop = loop
def consume(self):
while(True):
try:
message = self.consumer.poll()
asyncio.run_coroutine_threadsafe(asynchronous_fetch(message.url), self.loop)
except Exception as e:
logging.exception(e)
finally:
self.consumer.commit()
Niestety, REST nigdy nie jest wywołany. W powyższym przypadku próbuję to zrobić za pomocą tornado.httpclient, ale próbowałem również z httpx i aiohttp. Spodziewałem się, że przekazanie asynchronous_fetch poprzez asyncio.run_coroutine_threadsafe spowoduje wykonanie asynchronous_fetch. We wszystkich przypadkach moja coroutine nie jest wykonywana. Co robię źle?