Как мультипроцессировать с сообщениями из несвязанной очереди - PullRequest
0 голосов
/ 20 февраля 2019

Я пишу приложение на Python.Он читает тему кафки с помощью потребителя.С каждым сообщением он выполняет некоторые действия, которые могут занять некоторое время, прежде чем выполнять некоторые действия со следующим сообщением.

Большинство приложений, использующих многопроцессорную библиотеку, включают передачу некоторой конечной итерации в map_async или apply_async.Мои попытки решить эту проблему с помощью этих двух функций, похоже, не работают, я думаю, потому что наша повторяемая в этом случае тема kafka, которая является несвязанной очередью.Есть ли способ «сделать что-то» неблокирующим образом в этом сценарии?

1 Ответ

0 голосов
/ 21 февраля 2019

Вы можете создать дочерний процесс и передать ему сообщение для обработки некоторых вещей:

from confluent_kafka import Consumer, KafkaError
from multiprocessing import Process


def do_stuff(msg):
    my_stuff = 'is doing here as a non-blocking way'

c = Consumer({
    'bootstrap.servers': 'mybroker',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    process = Process(target=do_stuff, args=(msg.value().decode('utf-8'), ))
    process.start()

c.close()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...