Кафка Python - CommitFailedError - PullRequest
       2

Кафка Python - CommitFailedError

0 голосов
/ 29 декабря 2018

Проблема: при вызове commit(), я получаю сообщение об ошибке, если выполнение сообщения занимает много времени.

Вот код, который я использую:

consumer = KafkaConsumer(topic,
      group_id='my-group',
      value_deserializer=lambda m: json.loads(m.decode('utf-8')),
      bootstrap_servers=kafka_server_url,
      api_version=(0, 10, 1),
      auto_offset_reset='earliest',
      enable_auto_commit= False,
      #consumer_timeout_ms=3000,
      session_timeout_ms=250000, #The timeout used to detect failures when using Kafka group management facilities. 
      request_timeout_ms=300000,
      heartbeat_interval_ms=80000,
      #max_poll_records=100,#The maximum number of records returned in a single call to poll(). Default: 500
      enable_auto_commit=False,
      retry_backoff_ms = 1000
      )
errored = False
while errored is False:
  logging.warn("Polling new messages from queue ......")
  messages = []
  crs = [] # Store all consumer records
  try:
    tpd = (consumer.poll(max_records=1))
    [crs.extend(tp) for tp in tpd.values()] # List of cr's
    [messages.extend([cr.value]) for cr in crs] 
    if(len(messages)):
      consumer.commit()
      logging.warn("{} messages available in queue".format(len(messages)))
      for msg in messages:
        if(self.__validators.is_valid_message(msg)):
          self.process_message(msg);
        else:
          continue
      # If auto_commit_enable is False, remember to commit() periodically    
      logging.warn("All messages processed. Commiting kafka consumer")

  except Exception as e:
    message = "Error occured while processing message Exception {}".format(e)
    logging.critical(message,exc_info=True)
    errored = True
    break
  logging.info("Sleeping for 5 Seconds")
  time.sleep(3)

def process_message(self,message):
 print("Processed")

Ниже приведена трассировка ошибки:

Трассировка (последний последний вызов): Файл "message_consumer.py ", строка 413, в файлеida_messages consumer.commit () Файл" /usr/lib/python3.6/site-packages/kafka/consumer/group.py ", строка 472, в коммите self._coordinator.commit_offsets_sync (смещения)Файл "/usr/lib/python3.6/site-packages/kafka/coordinator/consumer.py", строка 398, в commit_offsets_sync поднять future.exception # pylint: disable-msg =ising-bad-type kafka.errors.CommitFailedError: CommitFailedError: Фиксация не может быть завершена, так как группа уже перебалансировала и присвоила разделы другому участнику.Это означает, что время между последующими вызовами poll () было больше, чем настроенный session.timeout.ms, что обычно означает, что цикл опроса тратит слишком много времени на обработку сообщений.Вы можете решить эту проблему, увеличив тайм-аут сеанса или уменьшив максимальный размер пакетов, возвращаемых в poll () с max.poll.records.

Мое сообщение не фиксируется и обрабатывается повторно.из очереди кафки.Как мне это решить?Пожалуйста, помогите.

Любая помощь будет оценена.Заранее спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...