Мой производитель не выдает никаких ошибок, но данные не отправляются в пункт назначения topi c. Можете ли вы порекомендовать какие-либо методы для устранения этой ситуации.
У меня есть вызов Confluent Python Avro Producer внутри синхронного l oop для отправки данных в topi c, например, так:
self.producer.produce(topic=test2, value=msg_dict)
После этого вызова у меня есть фрагмент кода, подобный следующему: flu sh queue:
num_messages_in_queue = self.producer.flush(timeout = 2.0)
print(f"flushed {num_messages_in_queue} messages from producer queue in iteration {num_iterations} ")
, который выполняется без ошибок. Но также нет обратного вызова после выполнения этого кода. Мой продюсер запускается следующим образом:
def __init__(self,broker_url=None,topic=None,schema_registry_url=None,schema_path=None):
try:
with open(schema_path, 'r') as content_file:
schema = avro.loads(content_file.read())
except Exception as e:
print(f"Error when trying to read avro schema file : {schema_path}")
self.conf = {
'bootstrap.servers': broker_url,
'on_delivery': self.delivery_report,
'schema.registry.url': schema_registry_url,
'acks': -1, #This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.
'enable.idempotence': False, #
"error_cb":self.error_cb
}
self.topic = topic
self.schema_path = schema_path
self.producer = AvroProducer(self.conf,default_key_schema=schema, default_value_schema=schema)
Мой метод обратного вызова выглядит следующим образом:
def delivery_report(self, err, msg):
print(f"began delivery_report")
if err is None:
print(f"delivery_report --> Delivered msg.value = {msg.value()} to topic= {msg.topic()} offset = {msg.offset} without err.")
else:
print(f"conf_worker AvroProducer failed to deliver message {msg.value()} to topic {self.topic}. got error= {err}")
После выполнения этого кода я смотрю на мои topi c в реестре схемы Контейнер выглядит так:
docker exec schema_registry_container kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic test2 --from-beginning
Я вижу этот вывод:
[2020-04-03 15: 48: 38,064] INFO Registered kafka: type = kafka. Log4jController MBean (kafka.utils.Log4jControllerRegistration $) [2020-04-03 15: 48: 38,742] Значения INFO ConsumerConfig: auto.commit.interval.ms = 5000 auto.offset.reset = самые ранние bootstrap .servers = [kafka : 29092] check.crcs = true client.dns.lookup = по умолчанию client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = console-consumer-49056 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave. group.on.close = true изоляция.level = read_uncommitted key.deserializer = class >> org. apache .kafka.common. serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metri c .reporters = [] metrics.num. samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org. apache .kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 переподключиться. backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos. kinit.cmd = / usr / bin / kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0,05 sasl.kerberos.ticket.renew .window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refre sh .buffer.seconds = 300 sasl.login.refre sh .min.period .seconds = 60 sasl.login.refre sh .window.factor = 0,8 sasl.login.refre sh .window.ji tter = 0,05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1 ] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class >> org . apache .kafka.common.serialization.ByteArrayDeserializer (org. apache .kafka.clients.consumer.ConsumerConfig) [2020-04-03 15: 48: 38,887] ИНФОРМАЦИЯ Версия Kafka: 2.1.0-cp1 (org. apache .kafka.common.utils.AppInfoParser) [2020-04-03 15: 48: 38,887] ИНФОРМАЦИЯ Kafka commitId: bda8715f42a1a3db (org. apache .kafka.common.utils.AppInfoParser) [2020-04-03 15: 48: 39,221] Идентификатор кластера INFO: KHKziPBvRKiozobbwvP1Fw (org. apache .kaMkatatalita). -04-03 15: 48: 39,224] ИНФОРМАЦИЯ [идентификатор клиента = потребитель-1, идентификатор группы = консоль-потребитель-49056] Обнаруженный координатор группы kafka: 29092 (идентификатор: 2147483646 стойка: пусто) (org. apache .kafka. clients.consumer.internals.AbstractCoordinator) [2020-04-03 15: 48: 39,231] INFO [Consumer clientId = consumer-1, groupId = console-consumer-49056] Отзыв ранее назначенных разделов [] (org. apache. kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15: 48: 39,231] ИНФОРМАЦИЯ [Consumer clientId = consumer-1, groupId = console-consumer-49056] (повторно) присоединение к группе> (org. apache .kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15: 48: 42,264] ИНФОРМАЦИЯ [Consumer clientId = consumer-1, groupId = co nsole-consumer-49056] Успешно объединенная группа с поколением 1 (org. apache .kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15: 48: 42,267] ИНФОРМАЦИЯ [Consumer clientId = consumer-1, groupId = console-consumer-49056] Установка новых назначенных разделов [test2-0]> (org. apache .kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15: 48: 42,293] INFO [Потребитель clientId = consumer-1, groupId = console-consumer-49056] Сброс смещения для раздела test2-0 на смещение 0.> (org. apache .kafka.clients.consumer.internals.Fetcher)