Проблема: при вызове commit()
, я получаю сообщение об ошибке, если выполнение сообщения занимает много времени.
Вот код, который я использую:
consumer = KafkaConsumer(topic,
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
bootstrap_servers=kafka_server_url,
api_version=(0, 10, 1),
auto_offset_reset='earliest',
enable_auto_commit= False,
#consumer_timeout_ms=3000,
session_timeout_ms=250000, #The timeout used to detect failures when using Kafka group management facilities.
request_timeout_ms=300000,
heartbeat_interval_ms=80000,
#max_poll_records=100,#The maximum number of records returned in a single call to poll(). Default: 500
enable_auto_commit=False,
retry_backoff_ms = 1000
)
errored = False
while errored is False:
logging.warn("Polling new messages from queue ......")
messages = []
crs = [] # Store all consumer records
try:
tpd = (consumer.poll(max_records=1))
[crs.extend(tp) for tp in tpd.values()] # List of cr's
[messages.extend([cr.value]) for cr in crs]
if(len(messages)):
consumer.commit()
logging.warn("{} messages available in queue".format(len(messages)))
for msg in messages:
if(self.__validators.is_valid_message(msg)):
self.process_message(msg);
else:
continue
# If auto_commit_enable is False, remember to commit() periodically
logging.warn("All messages processed. Commiting kafka consumer")
except Exception as e:
message = "Error occured while processing message Exception {}".format(e)
logging.critical(message,exc_info=True)
errored = True
break
logging.info("Sleeping for 5 Seconds")
time.sleep(3)
def process_message(self,message):
print("Processed")
Ниже приведена трассировка ошибки:
Трассировка (последний последний вызов): Файл "message_consumer.py ", строка 413, в файлеida_messages consumer.commit () Файл" /usr/lib/python3.6/site-packages/kafka/consumer/group.py ", строка 472, в коммите self._coordinator.commit_offsets_sync (смещения)Файл "/usr/lib/python3.6/site-packages/kafka/coordinator/consumer.py", строка 398, в commit_offsets_sync поднять future.exception # pylint: disable-msg =ising-bad-type kafka.errors.CommitFailedError: CommitFailedError: Фиксация не может быть завершена, так как группа уже перебалансировала и присвоила разделы другому участнику.Это означает, что время между последующими вызовами poll () было больше, чем настроенный session.timeout.ms, что обычно означает, что цикл опроса тратит слишком много времени на обработку сообщений.Вы можете решить эту проблему, увеличив тайм-аут сеанса или уменьшив максимальный размер пакетов, возвращаемых в poll () с max.poll.records.
Мое сообщение не фиксируется и обрабатывается повторно.из очереди кафки.Как мне это решить?Пожалуйста, помогите.
Любая помощь будет оценена.Заранее спасибо.