У меня есть поток, в котором из мэйнфрейма IBM IIDR я отправляю записи в Kafka topi c. value_format
сообщения, приходящего на Kafka topi c, - это AVRO, и ключ также находится в формате AVRO. Записи запихиваются в топи Кафки c. У меня есть поток, связанный с этим топи c. Но записи в поток не передаются. Пример test_iidr
topi c -
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
Формат value_format в потоке - AVRO, и все имена столбцов проверяются.
Запрос на создание потока -
CREATE STREAM test_iidr (
col1 STRING,
col2 DECIMAL(2,0),
col3 DECIMAL(1,0),
iidr_tran_type STRING,
iidr_a_ccid STRING,
iidr_a_user STRING,
iidr_src_upd_ts STRING,
iidr_a_member STRING)
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');
Не удается загрузить в поток из topi c, поскольку KEY
не упоминается в операторе WITH
? В реестре схем зарегистрированы субъекты test_iidr-value
и test_iidr-key
.
key.converter
и value.converter
в Kafka-connect
docker установлены как - org.apache.kafka.connect.json.JsonConverter
. Это JsonConverter
создает эту проблему?
Я создал совершенно другой конвейер с другим потоком и вставил те же данные вручную с помощью операторов insert into
. Это сработало. Не работает только поток IIDR и записи не попадают в поток из топи c.
Я использую Confluent kafka версии 5.5.0.