Consumer_failed_message в потоке kafka: записи не отправлены из темы - PullRequest
0 голосов
/ 30 мая 2020

У меня есть поток, в котором из мэйнфрейма IBM IIDR я отправляю записи в Kafka topi c. value_format сообщения, приходящего на Kafka topi c, - это AVRO, и ключ также находится в формате AVRO. Записи запихиваются в топи Кафки c. У меня есть поток, связанный с этим топи c. Но записи в поток не передаются. Пример test_iidr topi c -

rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

Формат value_format в потоке - AVRO, и все имена столбцов проверяются.

Запрос на создание потока -

CREATE STREAM test_iidr (
  col1 STRING, 
  col2 DECIMAL(2,0),
  col3 DECIMAL(1,0),
  iidr_tran_type STRING,
  iidr_a_ccid STRING,
  iidr_a_user STRING,
  iidr_src_upd_ts STRING,
  iidr_a_member STRING) 
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');

Не удается загрузить в поток из topi c, поскольку KEY не упоминается в операторе WITH? В реестре схем зарегистрированы субъекты test_iidr-value и test_iidr-key.

key.converter и value.converter в Kafka-connect docker установлены как - org.apache.kafka.connect.json.JsonConverter. Это JsonConverter создает эту проблему?

Я создал совершенно другой конвейер с другим потоком и вставил те же данные вручную с помощью операторов insert into. Это сработало. Не работает только поток IIDR и записи не попадают в поток из топи c.

Я использую Confluent kafka версии 5.5.0.

1 Ответ

1 голос
/ 02 июня 2020

JsonConverter в конфигурации подключения вполне может преобразовывать ваши данные Avro в JSON.

Чтобы определить форматы сериализации ключей и значений, вы можете использовать команду PRINT (которую, как я вижу, вы уже выполнили). PRINT будет выводить форматы ключей и значений при запуске. Например:

ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

Итак, первое, что нужно проверить, это форматы, выводимые для ключа и значения с помощью PRINT, а затем соответствующим образом обновить оператор CREATE.

Обратите внимание, что ksqlDB не еще поддерживает ключи Avro / Json, поэтому вам может потребоваться / нужно переразбить свои данные, см .: https://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what -to-do-if-your-key-is-not-set -или-в-другом-формате

Боковое примечание : если схема для значения хранится в реестре схем, вам не нужно определите столбцы в вашем операторе CREATE, поскольку ksqlDB загрузит столбцы из реестра схемы

Дополнительное примечание : вам не нужно PARTITIONS=1, REPLICAS=3 в предложении WITH для существующих тем, только если вы хотите, чтобы ksqlDB создал для вас topi c.

...