Потребитель PyKafka конфликтует с конечным потребителем - PullRequest
0 голосов
/ 06 июня 2018

Я хочу реализовать Kafka для обмена сообщениями через распределенную микросервисную архитектуру.

Я использую PyKafka и реализовал фиктивного производителя и (сбалансированного) потребителя.Я назначил всех потребителей в той же группе потребителей .У меня нет проблем с использованием производителей из Python и Console одновременно, даже при их добавлении во время выполнения.

Однако у меня проблема с потребителями.Я могу создать несколько потребителей Python и даже добавить их во время выполнения.Но когда я добавляю Console customer (kafka-console-consumer) в группу с потребителями Python, я получаю мьютексную ошибку:

Ошибка фиксации смещения для темы 'b'michal_sample_topic' 'из идентификатора потребителя'b'Michals-MacBook-Pro.local: 1722eea0-07d3-4be4-9d97-8b7fb15b0b30 '' (ошибки: {'pykafka.exceptions.UnknownMemberId': [0, 1]})

Более тогооба они (даже если они принадлежат к одной и той же группе потребителей) потребляют сообщения (потребители Python балансируют их между собой и консольные потребители между собой)

Теперь я новичок в Kafka, но мое первое впечатление былочто кафка должна быть независимой от потребителей, поэтому их объединение должно быть возможным.Является ли проблема в моем понимании, PyKafka или моя реализация PyKafka?

Производитель:

from pykafka import KafkaClient
from time import sleep

client = KafkaClient(hosts="localhost:9092")

print(client.brokers)
print(client.topics)

topic = client.topics[b'michal_sample_topic']

with topic.get_sync_producer() as producer:

while True:
    producer.produce(
        bytes(
            input('Send test message:'), 
            'utf-8'
        )
    )

Потребитель:

from pykafka import KafkaClient

client = KafkaClient(hosts="localhost:9092")

print(client.brokers)
print(client.topics)

topic = client.topics[b'michal_sample_topic']

balanced_consumer = topic.get_balanced_consumer(
    consumer_group=b'testing',
    auto_commit_enable=True,
    zookeeper_connect='localhost:2181'
)

for message in balanced_consumer:
    if message is not None:
        print(f'{message.offset} {message.value}')
...