У меня есть приложение на Python, которое читает из 2 тем. Один из них похож на пакетный процесс (всегда запущен), а другой - на специальный.
Я хочу, чтобы приложение выполняло пакетный процесс, но проверил специальное решение после обработки нескольких записей (это цель). И если в специальной теме есть элементы, то сначала выполните это, а затем вернитесь к пакетному процессу
Во всяком случае, это то, с чего я начал:
while True:
for msg_1 in consumer_1:
print('consumer_1 {0}'.format(msg_1.value)
for msg_2 in consumer_2:
print('consumer_2 {0}'.format(msg_2.value)
Конечно, это не работает. Он застревает во втором цикле for.
Я прочитал этот пост и попытался его реализовать ( Kafka Consumer - тема (ы) с более высоким приоритетом ). Ответ был дан в 2017 году, и я думаю, что с тех пор в API произошли изменения.
while True:
for msg_1 in consumer_1:
print('consumer_1 {0}'.format(msg_1.value)
client = SimpleClient('localhost:9092')
topic_partitions_ids = client.get_partition_ids_for_topic(b'two')
end_offsets = consumer_2.end_offsets(topic_partitons_ids)
committed = consumer_2.committed(topic_partitions_ids)
if end_offsets - committed > 0:
for msg_2 in consumer_2:
consumer_1.pause()
print('consumer_2 {0}'.format(msg_2.value)
consumer_1.resume()
Но я застрял на линии
committed = consumer_2.committed(topic_partitions_ids)
с сообщением об ошибке:
раздел должен быть разделом с именем TopicPartition
Я верю, что есть простой способ сделать это, но для моей жизни я не могу найти выход. И я также думаю, что мне вообще не нужно останавливать customer_1.
Может кто-нибудь помочь?
Просто обратите внимание, это то, как я создал consumer_1 и consumer_2, на случай, если это уместно.
consumer_1 = KafkaConsumer('one',
group_id='one-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: loads(x.decode('utf-8')))
consumer_2 = KafkaConsumer('two',
group_id='two-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: loads(x.decode('utf-8')))