Ниже приведен мой сценарий использования, в котором одно приложение отправляет sh данные в три разные темы kafka (есть уникальный идентификатор приложения), а вывод go отправляется в последующую очередь 4 и очередь 5. Я уже реализовал конвейер, показанный ниже.
Единственная проблема, с которой я столкнулся, как объединить весь вывод для определенного app_id из topi c 5. Приложение отправляет несколько запросов, каждый из которых имеет уникальный идентификатор в этом конвейере. Таким образом, все запросы на конкретный app_id могут быть не последовательными. В очереди 5 могут быть другие данные app_id.
Следует ли мне использовать разные group_id для каждого из app_id при создании потребителя для topi c 5?
Пожалуйста, помогите мне, если у вас есть идея. Я использую kafka- python.
from kafka import KafkaConsumer, KafkaProducer
KAFKA = dict()
KAFKA['producer'] = KafkaProducer(bootstrap_servers=[server]))
for queue in ['queue 1', 'queue 2', 'queue 3', 'queue 4', 'queue 5']:
KAFKA['queue'] = KafkaConsumer(queue,
bootstrap_servers=[server],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id='group'+queue)
введите описание изображения здесь