Попытка использовать сообщения с python многопроцессорностью - PullRequest
1 голос
/ 01 марта 2020

Я не могу использовать сообщения с кодом ниже. Я могу потреблять, если я просто прямо consOne.startLoop(). Что мне здесь не хватает. Ценю помощь.

from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition
from multiprocessing import Process
import sys


idlist = []
def setConfig(bootstrapServers, groupId, autoOffsetReset):
    consumerConf = {}
    consumerConf['bootstrap.servers']   = bootstrapServers 
    consumerConf['group.id']            = groupId
    consumerConf['auto.offset.reset']   = autoOffsetReset
    print(consumerConf)
    return consumerConf

def createConsumer(consumerConf, topic):
    consumer = Consumer(consumerConf)
    consumer.subscribe([topic])
    print("consumer subscribed to topic {}".format(topic))
    return consumer


# self.consumer.assign([TopicPartition(topic, partition)])

def startLoop(consumer):
    try:
        while True:
            message = consumer.poll(1.0)
            if message is None:
                print("none")
                continue
            elif message.error():
                if message.error().code == KafkaError._PARTITION_EOF:
                    sys.stderr.write('EOF Partition - {}  '.format(message.partition()))
                else:
                    sys.stderr.write('Consumer Error on Topic - {}  '.format(message.topic()))
                    sys.stderr.write('''-- topic        - {} 
                                        -- partition    - {} 
                                        -- offset       - {}'''.format(
                                                    message.topic(), message.partition(), message.offset()))
            else:
                print('Received message: {}'.format(message.value().decode('utf-8')))
                handleMessage(message.value())

    except KeyboardInterrupt:
        sys.stderr.write('Kafka Exception raised - {}  '.format(message.topic()))
        sys.exit(1)

    finally:
        consumer.close()

# body of the message or (message.vlue())
def handleMessage(body):
    global idlist
    idlist.append(body)
    print(idlist)

if __name__ === '__main__':
    config = setConfig('localhost:9092', groupId='group', 
    autoOffsetReset='smallest')
    consOne = createConsumer(config, 'test')
    # consOne.startLoop() Works!
    processOne = Process(target=startLoop, args=(consOne, ), group=None) 
    # doesn't work :(
    processOne.start()
    processOne.join()


consumer = Consumer({'bootstrap.servers':'localhost:9092', 'group.id':'group', 'auto.offset.reset':'smallest'})
consumer.subscribe(['test'])

def startLoop():
    try:
        global consumer 
        print(consumer)
        while True:
            message = consumer.poll(1.0)
            if message is None:
                print("none")
                continue
            elif message.error():
                if message.error().code == KafkaError._PARTITION_EOF:
                    sys.stderr.write('EOF Partition - {}  '.format(message.partition()))
                else:
                    sys.stderr.write('Consumer Error on Topic - {}  '.format(message.topic()))
                    sys.stderr.write('''-- topic        - {} 
                                        -- partition    - {} 
                                        -- offset       - {}'''.format(
                                                    message.topic(), message.partition(), message.offset()))
            else:
                print('Received message: {}'.format(message.value().decode('utf-8')))
                # handleMessage(message.value())

    except KeyboardInterrupt:
        sys.stderr.write('Kafka Exception raised - {}  '.format(message.topic()))
        sys.exit(1)

    finally:
        consumer.close()

if __name__ == '__main__':
    processOne = Process(target=startLoop, group=None) 
    # still consumes message with startLoop() but not with processOne.start()
    # startLoop() 
    processOne.start()
    processOne.join()



1 Ответ

0 голосов
/ 01 марта 2020

Возможно, вы используете multiprocessing неправильно. Пример официального документа .

Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая непреднамеренных побочных эффектов (таких как запуск нового обработать). Безопасный импорт основного модуля | Руководство по программированию

Итак, необходимо запустить процесс в if __name__ == '__main__':.

...