Я заинтересован в реализации метода на python, который будет вызываться каждые несколько минут и извлекать новые сообщения из определенной темы (скажем, из всех разделов).Я попробовал это:
consumer = kafka_topic.get_simple_consumer(
auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=False)
messages = list()
for message in consumer:
if message is not None:
messages.append(create_message(message=message, topic=TOPIC))
, но он входит в бесконечный цикл (хотя потребление выглядит точно так, как описано в документации по pykafka).
Я также попробовал это:
consumer = kafka_topic.get_simple_consumer(
auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=False)
messages = list()
for message in islice(consumer, LAST_N_MESSAGES):
if message is not None:
messages.append(create_message(message=message, topic=TOPIC))
Этот цикл был конечным и действительно извлекал все сообщения, но при следующем вызове этого скрипта он снова получал те же сообщения.Кроме того, у меня не будет точного количества сообщений, которые я хочу получить.
Я был основан на этой документации pykafka:
- Для любой новой группы / темы / разделов, потребление сообщенийначнется с auto_offset_reset.Это верно независимо от значения reset_offset_on_start.
- Для любой существующей группы / темы / разделов, при условии, что reset_offset_on_start = False, потребление начнется со смещения, непосредственно следующего за последним смещенным подтверждением смещения (если последнее смещенное смещение было 4потребление начинается с 5).Если reset_offset_on_start = True, потребление начинается с auto_offset_reset.Если нет зафиксированного смещения, группа / тема / раздел считается новой.
Тем не менее потребитель, похоже, не начинает потреблять смещение, следующее за последним смещением, а с началатема.