прием сообщений в пикафке с простым потребительским экземпляром - PullRequest
0 голосов
/ 24 сентября 2019

Я заинтересован в реализации метода на python, который будет вызываться каждые несколько минут и извлекать новые сообщения из определенной темы (скажем, из всех разделов).Я попробовал это:

    consumer = kafka_topic.get_simple_consumer(
        auto_offset_reset=OffsetType.EARLIEST,
        reset_offset_on_start=False)

    messages = list()
    for message in consumer:
        if message is not None:
            messages.append(create_message(message=message, topic=TOPIC))

, но он входит в бесконечный цикл (хотя потребление выглядит точно так, как описано в документации по pykafka).

Я также попробовал это:

    consumer = kafka_topic.get_simple_consumer(
        auto_offset_reset=OffsetType.EARLIEST,
        reset_offset_on_start=False)

    messages = list()
    for message in islice(consumer, LAST_N_MESSAGES):
        if message is not None:
            messages.append(create_message(message=message, topic=TOPIC))

Этот цикл был конечным и действительно извлекал все сообщения, но при следующем вызове этого скрипта он снова получал те же сообщения.Кроме того, у меня не будет точного количества сообщений, которые я хочу получить.

Я был основан на этой документации pykafka:

  • Для любой новой группы / темы / разделов, потребление сообщенийначнется с auto_offset_reset.Это верно независимо от значения reset_offset_on_start.
  • Для любой существующей группы / темы / разделов, при условии, что reset_offset_on_start = False, потребление начнется со смещения, непосредственно следующего за последним смещенным подтверждением смещения (если последнее смещенное смещение было 4потребление начинается с 5).Если reset_offset_on_start = True, потребление начинается с auto_offset_reset.Если нет зафиксированного смещения, группа / тема / раздел считается новой.

Тем не менее потребитель, похоже, не начинает потреблять смещение, следующее за последним смещением, а с началатема.

...