Почему мой потребитель apache kafka случайно игнорирует сообщения в очереди? - PullRequest
0 голосов
/ 07 февраля 2019

Это, вероятно, eisenbug, поэтому я не жду жестких ответов, но больше подсказок о том, что искать, чтобы иметь возможность воспроизвести ошибку.

У меня есть система, основанная на событиях, основанная на kafka, составленнаяиз нескольких услуг.На данный момент они организованы в линейные трубопроводы.Одна тема, один тип события.Каждый сервис можно рассматривать как преобразование одного типа события в один или несколько типов событий.

Каждое преобразование выполняется как процесс Python, со своим собственным потребителем и собственным производителем.Все они используют один и тот же код и конфигурацию, потому что все это абстрагировано от реализации сервиса.

Теперь, в чем проблема.В нашей промежуточной среде иногда (скажем, одно из каждых пятидесяти сообщений) на Кафке есть сообщение, но потребитель его вообще не обрабатывает.Даже если вы ждете часы, он просто зависает.Этого не происходит в локальной среде, и мы не смогли воспроизвести его где-либо еще.

Некоторая более важная информация:

  • эти службы часто перезапускаются для целей отладки, нозависание не похоже на перезапуск.
  • Когда сообщение зависает и вы перезапускаете службу, служба обрабатывает сообщение.
  • Службы полностью не сохраняют состояние, поэтому кеширование или другие странные действия не происходят (я надеюсь)
  • Когда это происходит, у меня есть доказательства того, что они все еще не обрабатывают более старые сообщения (я регистрирую, когда они выдают результат, и это происходит прямо перед концом цикла потребителя)
  • При текущем развертывании естьтолько один потребитель в группе потребителей, поэтому нет параллельной обработки внутри одних и тех же сервисов, нет горизонтального масштабирования сервиса

Как я потребляю:

Я использую pykafka, и этоПотребительский цикл:

def create_consumer(self):

    consumer = self.client.topics[bytes(self.input_topic, "UTF-8")].get_simple_consumer(
        consumer_group=bytes(self.consumer_group, "UTF-8"),
        auto_commit_enable=True,
        offsets_commit_max_retries=self.service_config.kafka_offsets_commit_max_retries,
    )
    return consumer

def run(self):

    consumer = self.create_consumer()
    while not self.stop_event.wait(1):
        message = consumer.consume()
        results = self._process_message(message)
        self.output_results(results)

Я предполагаю, что есть или какая-то проблема с тем, как я потребляю сообщения, или есть какое-то непоследовательное состояние смещений группы потребителей, но я не могу действительно сосредоточиться на этой проблеме.

Я также планирую переехать в Фауст, чтобы решить эту проблему.Учитывая мою кодовую базу и мое архитектурное решение, переход не должен быть слишком сложным, но прежде чем начать такую ​​работу, я хотел бы быть уверен, что я иду в правильном направлении.Прямо сейчас это был бы слепой выстрел, надеющийся, что некоторые детали, которые создают проблему, исчезнут.

...