Потребители питона Kafka, работающие в параллельных потоках - PullRequest
0 голосов
/ 12 июня 2018

Я абсолютный новичок в питоне и кафке.У меня есть скрипт, который должен запустить трех потребителей kafka, ждать сообщений от этих потребителей и делать некоторые другие вещи.На данный момент я даже не знаю, иду ли я в правильном направлении, поэтому любая помощь будет оценена.

class MainClass():
    def do_something_before(self):
        # something is done here

    def start_consumer(self):
        consumer1_thread = threading.Thread(target=self.cons1, args=())
        consumer2_thread = threading.Thread(target=self.cons2, args=())
        consumer1_thread.daemon = True
        consumer2_thread.daemon = True
        consumer1_thread.start()
        consumer2_thread.start()

    def cons1(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my-topic'])
        for message in consumer:
            print(message.value)

    def cons2(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my2-topic'])
        for message in consumer:
            print(message.value)

    def keep_working(self):
        # something is done here

if __name__ == 'main':
    g = MainClass()
    g.do_something_before()
    g.keep_working()

1 Ответ

0 голосов
/ 20 июля 2018

Я добавил пример python-kafka с двумя потребителями (в основном два процесса python), вы можете найти его по ссылке на github здесь https://github.com/Shubhamgorde/kafka-python-app.

Не могу опубликовать все файлы python, он немного большой.

from multiprocessing import Process
def consumeData(topic):
    try:
         consumer = KafkaConsumer(topic, value_deserializer=lambda v: 
           binascii.unhexlify(v).decode('utf-8'))
    except:
         print("Error!!")
for msg in consumer:
    msg=ast.literal_eval(msg.value)
    if(msg[2] == 'C'):
        performCreditOperation(msg)
    elif (msg[2] == 'D'):
          performDebitOperation(msg)
t1 = Process(target=consumeData, args=('Credit_transac',))
t2 = Process(target=consumeData, args=('Debit_transac',))
t1.start()
t2.start()
...