Confluent kafka Python клиент Avroufact.producer () выполняется без ошибок, но нет данных в теме - PullRequest
0 голосов
/ 03 апреля 2020

Мой производитель не выдает никаких ошибок, но данные не отправляются в пункт назначения topi c. Можете ли вы порекомендовать какие-либо методы для устранения этой ситуации.

У меня есть вызов Confluent Python Avro Producer внутри синхронного l oop для отправки данных в topi c, например, так:

self.producer.produce(topic=test2, value=msg_dict)

После этого вызова у меня есть фрагмент кода, подобный следующему: flu sh queue:

num_messages_in_queue = self.producer.flush(timeout = 2.0)
print(f"flushed {num_messages_in_queue} messages from producer queue in iteration {num_iterations} ")

, который выполняется без ошибок. Но также нет обратного вызова после выполнения этого кода. Мой продюсер запускается следующим образом:

 def __init__(self,broker_url=None,topic=None,schema_registry_url=None,schema_path=None):
  try:
    with open(schema_path, 'r') as content_file:
      schema = avro.loads(content_file.read())
  except Exception as e:
    print(f"Error when trying to read avro schema file : {schema_path}")

  self.conf = {
    'bootstrap.servers': broker_url,
    'on_delivery': self.delivery_report,
    'schema.registry.url': schema_registry_url,
    'acks': -1, #This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. 
    'enable.idempotence': False, #
    "error_cb":self.error_cb
  }
  self.topic = topic
  self.schema_path = schema_path
  self.producer = AvroProducer(self.conf,default_key_schema=schema, default_value_schema=schema) 

Мой метод обратного вызова выглядит следующим образом:

def delivery_report(self, err, msg):
    print(f"began delivery_report")
    if err is None:
        print(f"delivery_report --> Delivered msg.value = {msg.value()} to topic= {msg.topic()} offset = {msg.offset} without err.")
    else:
        print(f"conf_worker AvroProducer failed to deliver message {msg.value()} to topic {self.topic}. got error= {err}") 

После выполнения этого кода я смотрю на мои topi c в реестре схемы Контейнер выглядит так:

docker exec schema_registry_container kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic test2 --from-beginning

Я вижу этот вывод:

[2020-04-03 15: 48: 38,064] INFO Registered kafka: type = kafka. Log4jController MBean (kafka.utils.Log4jControllerRegistration $) [2020-04-03 15: 48: 38,742] Значения INFO ConsumerConfig: auto.commit.interval.ms = 5000 auto.offset.reset = самые ранние bootstrap .servers = [kafka : 29092] check.crcs = true client.dns.lookup = по умолчанию client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = console-consumer-49056 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave. group.on.close = true изоляция.level = read_uncommitted key.deserializer = class >> org. apache .kafka.common. serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metri c .reporters = [] metrics.num. samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org. apache .kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 переподключиться. backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos. kinit.cmd = / usr / bin / kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0,05 sasl.kerberos.ticket.renew .window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refre sh .buffer.seconds = 300 sasl.login.refre sh .min.period .seconds = 60 sasl.login.refre sh .window.factor = 0,8 sasl.login.refre sh .window.ji tter = 0,05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1 ] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class >> org . apache .kafka.common.serialization.ByteArrayDeserializer (org. apache .kafka.clients.consumer.ConsumerConfig) [2020-04-03 15: 48: 38,887] ИНФОРМАЦИЯ Версия Kafka: 2.1.0-cp1 (org. apache .kafka.common.utils.AppInfoParser) [2020-04-03 15: 48: 38,887] ИНФОРМАЦИЯ Kafka commitId: bda8715f42a1a3db (org. apache .kafka.common.utils.AppInfoParser) [2020-04-03 15: 48: 39,221] Идентификатор кластера INFO: KHKziPBvRKiozobbwvP1Fw (org. apache .kaMkatatalita). -04-03 15: 48: 39,224] ИНФОРМАЦИЯ [идентификатор клиента = потребитель-1, идентификатор группы = консоль-потребитель-49056] Обнаруженный координатор группы kafka: 29092 (идентификатор: 2147483646 стойка: пусто) (org. apache .kafka. clients.consumer.internals.AbstractCoordinator) [2020-04-03 15: 48: 39,231] INFO [Consumer clientId = consumer-1, groupId = console-consumer-49056] Отзыв ранее назначенных разделов [] (org. apache. kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15: 48: 39,231] ИНФОРМАЦИЯ [Consumer clientId = consumer-1, groupId = console-consumer-49056] (повторно) присоединение к группе> (org. apache .kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15: 48: 42,264] ИНФОРМАЦИЯ [Consumer clientId = consumer-1, groupId = co nsole-consumer-49056] Успешно объединенная группа с поколением 1 (org. apache .kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15: 48: 42,267] ИНФОРМАЦИЯ [Consumer clientId = consumer-1, groupId = console-consumer-49056] Установка новых назначенных разделов [test2-0]> (org. apache .kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15: 48: 42,293] INFO [Потребитель clientId = consumer-1, groupId = console-consumer-49056] Сброс смещения для раздела test2-0 на смещение 0.> (org. apache .kafka.clients.consumer.internals.Fetcher)

1 Ответ

0 голосов
/ 03 апреля 2020

Так что ответ настолько тривиален, что его смущает! Но это указывает на тот факт, что в многослойной инфраструктуре одно значение, заданное неверно, может привести к тихому отказу, который может быть очень утомительным для отслеживания.

Таким образом, проблема возникла из-за неправильной установки параметра my in мой docker -compose.yml файл, в котором переменная env для broker_url не была установлена. Код приложения нуждался в этой переменной для ссылки на брокера kafka. Однако для этого пропущенного параметра не было исключений, и он молча провалился.

...