У меня ошибка с библиотекой AIOKafka в Python (версии в конце). В основном, я получаю сообщение о сбое сердцебиения, и тогда фиксация смещений не может быть выполнена. Это журнал:
Heartbeat failed for group my-group-dag-kafka because it is rebalancing
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1)for group my-group-dag-kafka.
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: my-group-dag-kafka
Traceback (most recent call last):
File "/app/manage.py", line 23, in <module>
server.serve()
File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 146, in serve
self._start_mq()
File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 186, in _start_mq
self.loop.run_until_complete(self.async_mq_loop())
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 202, in async_mq_loop
async for message in self.consumer:
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1220, in __anext__
return (yield from self.getone())
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1101, in getone
msg = yield from self._fetcher.next_record(partitions)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1051, in next_record
yield from waiter
kafka.errors.UnknownMemberIdError: [Error 25] UnknownMemberIdError: my-group-dag-kafka
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7fd58b49de10>
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fd58bd9c4d0>
Task was destroyed but it is pending!
task: <Task pending coro=<Sender._sender_routine() running at /usr/local/lib/python3.7/site-packages/aiokafka/producer/sender.py:151> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd58ab91ad0>()]> cb=[Sender._fail_all()]>
Версия python - 3.7.4, версия AIOKafka - 0.5.2, а конфигурация Consumer -
self.consumer = AIOKafkaConsumer(
self.config.KAFKA_TOPIC,
loop=self.loop,
bootstrap_servers=self.config.KAFKA_URL,
group_id=self.config.KAFKA_GROUP_ID,
fetch_min_bytes=100000)
Ошибка происходит самопроизвольно. Я попытался изменить эту конфигурацию
heartbeat.interval.ms
session.timeout.ms
max.poll.interval.ms
Но я считаю, что стандартные значения должны быть в порядке. Что говорит эта ошибка? Что потребитель больше не является частью группы, и этот l oop просто заканчивается? Кстати, после того, как эта ошибка произошла, контейнер просто умирает
async for message in self.consumer:
try:
asyncio.create_task(self.async_handle_message(message))
except Exception as e:
import traceback
traceback.print_exc()
print('Exception: ', e, flush=True)
Любое руководство по устранению проблемы или настройке конфигурации хорошо принято. Хорошего дня:)