У меня есть сценарий потребителя, который обрабатывает каждое сообщение и фиксирует смещения вручную в теме.
CONSUMER = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_SERVER],
auto_offset_reset="earliest",
max_poll_records=100,
enable_auto_commit=False,
group_id=CONSUMER_GROUP,
# Use the RoundRobinPartition method
partition_assignment_strategy=[RoundRobinPartitionAssignor],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
while True:
count += 1
LOGGER.info("--------------Poll {0}---------".format(count))
for msg in CONSUMER:
# Process msg.value
# Commit offset to topic
tp = TopicPartition(msg.topic, msg.partition)
offsets = {tp: OffsetAndMetadata(msg.offset, None)}
CONSUMER.commit(offsets=offsets)
Время, необходимое для обработки каждого сообщения, составляет <1 сек. </p>
Я получаю эту ошибкуОшибка:
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
Process finished with exit code 1
Ожидание:
a) Как исправить эту ошибку?
b) Как убедиться, что моя ручная фиксация работает правильно?
в) Правильный способ фиксации смещения.
Я прошел через это, но Разница между session.timeout.ms и max.poll.interval.ms для Kafka 0.10.0.0 и более поздних версий чтобы понять мою проблему, очень важна любая помощь по настройке времени опроса, сеанса или пульса.
Apache kafka: 2.11-2.1.0 kafka-python: 1.4.4