Я хочу реализовать Kafka для обмена сообщениями через распределенную микросервисную архитектуру.
Я использую PyKafka и реализовал фиктивного производителя и (сбалансированного) потребителя.Я назначил всех потребителей в той же группе потребителей .У меня нет проблем с использованием производителей из Python и Console одновременно, даже при их добавлении во время выполнения.
Однако у меня проблема с потребителями.Я могу создать несколько потребителей Python и даже добавить их во время выполнения.Но когда я добавляю Console customer (kafka-console-consumer) в группу с потребителями Python, я получаю мьютексную ошибку:
Ошибка фиксации смещения для темы 'b'michal_sample_topic' 'из идентификатора потребителя'b'Michals-MacBook-Pro.local: 1722eea0-07d3-4be4-9d97-8b7fb15b0b30 '' (ошибки: {'pykafka.exceptions.UnknownMemberId': [0, 1]})
Более тогооба они (даже если они принадлежат к одной и той же группе потребителей) потребляют сообщения (потребители Python балансируют их между собой и консольные потребители между собой)
Теперь я новичок в Kafka, но мое первое впечатление былочто кафка должна быть независимой от потребителей, поэтому их объединение должно быть возможным.Является ли проблема в моем понимании, PyKafka или моя реализация PyKafka?
Производитель:
from pykafka import KafkaClient
from time import sleep
client = KafkaClient(hosts="localhost:9092")
print(client.brokers)
print(client.topics)
topic = client.topics[b'michal_sample_topic']
with topic.get_sync_producer() as producer:
while True:
producer.produce(
bytes(
input('Send test message:'),
'utf-8'
)
)
Потребитель:
from pykafka import KafkaClient
client = KafkaClient(hosts="localhost:9092")
print(client.brokers)
print(client.topics)
topic = client.topics[b'michal_sample_topic']
balanced_consumer = topic.get_balanced_consumer(
consumer_group=b'testing',
auto_commit_enable=True,
zookeeper_connect='localhost:2181'
)
for message in balanced_consumer:
if message is not None:
print(f'{message.offset} {message.value}')