Как использовать сообщение Kafka без автоматической фиксации, обрабатывать его в течение длительного времени (4-60 минут) и фиксировать, не подвергаясь перебалансировке, а также переназначать разделы или блокировать использование других сообщений другими пользователями группы.
Я использую потребителя Python 3.8 Kafka, чтобы:
- Использовать по одному сообщению за раз, без автоматической фиксации.
- Запуск длительной работы скрипт (чтение его стандартного вывода в Python)
- Условно зафиксируйте сообщение.
Моя проблема в том, что часто разделы Kafka перебалансируются с другим членом группы потребителей.
После заливки документации я попытался поиграть со следующими свойствами конфигурации:
- session_timeout_ms
- request_timeout_ms
max_poll_interval_ms
from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition
def consume_one_message_at_a_time(conf):
conf.models_dir = f'{conf.project_root}/{conf.models_dir}'
group_id = conf.group_id
group_conf = conf.consumer_groups[group_id]
kafka_brokers = conf.KAFKA_BROKERS
topic = group_conf.subscribe[0].name
print(f'KAFKA_BROKERS: {kafka_brokers}\n Topic {topic}\n group id: {group_id}')
consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_brokers,
group_id=group_id,
enable_auto_commit=False,
max_poll_records=1,
max_poll_interval_ms=1800000,
# session_timeout_ms=1800000,
# request_timeout_ms=1800002,
# connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,
)
print(f'bootstrap_servers: {kafka_brokers} subscribing to {topic}')
consumer.subscribe([topic])
for message in consumer:
print(f"message is of type: {type(message)}")
if not group_conf.use_cmd:
do_something_time_consuming(message)
else:
if group_id == 'bots' and check_bot_id(message):
bot_action(conf, group_conf, message)
else:
print(f'no action for group_id: {group_id}')
print(f'key : {message.key}')
print(f'value: {message.value}')
meta = consumer.partitions_for_topic(message.topic)
partition = TopicPartition(message.topic, message.partition)
offsets = OffsetAndMetadata(message.offset + 1, meta)
options = {partition: offsets}
print(f'\noptions: {options}\n')
response = consumer.commit(offsets=options)
Когда другие члены группы подписываются или заканчивают sh свои работы и потребляют, я получаю эту ошибку:
Traceback (most recent call last):
File "./consumer_one_at_a_time.py", line 148, in <module>
consume_one_message_at_a_time(_conf)
File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time
response = consumer.commit(offsets=options)
File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
self._coordinator.commit_offsets_sync(offsets)
File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
После добавления этих конфигураций я обнаружил, что новые потребители заблокированы! Т.е. не потреблять сообщения, пока одно не будет зафиксировано!
session_timeout_ms=1800000,
request_timeout_ms=1800002,
connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,
Я прочитал, что фоновый поток должен посылать сердцебиение. Есть ли способ отправить сердцебиение без опроса?