Я хочу иметь возможность читать сообщения, приходящие в определенные разделы темы, а также сообщения в другой теме, как если бы я делал простые Consumer
.
self.consumer = AvroConsumer(conf)
parts = [TopicPartition('p_topic', 13),
TopicPartition('p_topic', 14)
self.consumer.assign(parts)
self.consumer.subscribe(['test_topic'])
, производимые некоторыми «клиентами».сообщения в разделах «p_topic», а некоторые (те, что я создал) в «test_topic», такие как:
self.p.produce('test_topic', msg)
Я не могу интегрировать эти два, хотя с кодом, показанным выше.Сообщения, которые я генерирую в 'test_topic', выдают:
File "/usr/local/lib/python2.7/dist-packages/confluent_kafka/avro/__init__.py", line 115, in poll
decoded_value = self._serializer.decode_message(message.value())
File "/usr/local/lib/python2.7/dist-packages/confluent_kafka/avro/serializer/message_serializer.py", line 214, in decode_message
raise SerializerError("message does not start with magic byte")
SerializerError
Как я могу читать оба одновременно, используя AvroConsumer
?