Как добавить столбец с отметкой времени сообщения kafka в коннекторе приемника kafka - PullRequest
0 голосов
/ 15 ноября 2018

Я настраиваю свой соединитель, используя файлы свойств / json, я пытаюсь добавить столбец отметки времени, содержащий отметку времени kafka, когда он безуспешно читает сообщение из соединителя источника.

Я пытался добавить transforms, но он всегда нулевой, и мой коннектор приемника "большой запрос" возвращает мне ошибку

Не удалось обновить схему таблицы

Я поместил эти конфигурации в свойствах коннектора bigquery

transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value

Мой источник Config Sap разъем

{
    "name": "sap",
    "config": {
        "connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
        "tasks.max": "10",
        "topics": "mytopic",
        "connection.url": "jdbc:sap://IP:30015/",
        "connection.user": "user",
        "connection.password": "pass",
        "group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
        "mytopic.table.name":                          "\"schema\".\"mytable\""  
       }
}

Мой коннектор BigQuery для мойки

name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1

sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000

project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime    
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value

Ответы [ 2 ]

0 голосов
/ 20 ноября 2018

СТАРЫЙ ОТВЕТ Я думаю, что дошел до понимания проблемы, стоящей за

Прежде всего, вы не можете использовать преобразование InsertField в любом соединителе источника, потому что значение метки времени для сообщения назначеново время записи в тему, так что это не то, что соединитель уже может знать,
для соединителя JDBC есть этот билет https://github.com/confluentinc/kafka-connect-jdbc/issues/311

и в соединителе источника sap также не работает.

Второй соединитель BigQuery содержит ошибку, которая не позволяет использовать InsertField для добавления метки времени к каждой таблице, как указано здесь

https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994

Так что если вы хотите использовать bigquery в качествевывести единственное решение прямо сейчас - вручную отредактировать схему каждой таблицы, чтобы добавить столбец перед загрузкой коннектора cink

ОБНОВЛЕНИЕ 2018-12-03 Окончательное решение всегда добавлять сообщениеотметка времени в соединителе SINK.Давайте предположим, что вы хотите добавить временную метку в КАЖДУЮ таблицу соединителя приемника

в вашем ИСТОЧНИКЕ СОЕДИНИТЕЛЬ поместить эту конфигурацию

"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime", 
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"

Это добавит имя столбца с именем "fieldtime«к каждой исходной таблице

в вашем SINK CONNECTOR поместите эти настройки

"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"

, это фактически удалит время поля столбца и добавит его снова с отметкой времени сообщения

Это решение автоматически добавит столбец с правильным значением без операции добавления

0 голосов
/ 16 ноября 2018

Я предполагаю, что ваша ошибка исходит от BigQuery, а не от Kafka Connect.

Например, запустите Console Consumer в автономном режиме, вы увидите сообщения типа

Struct{...,fieldtime=Fri Nov 16 07:38:19 UTC 2018}


Проверено с connect-standalone ./connect-standalone.properties ./connect-console-sink.properties

У меня есть тема ввода с данными Avro ... Обновите свои собственные настройки соответственно

connect-standalone.properties

bootstrap.servers=kafka:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/share/java

connect-console-sink.properties

name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic

transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
...