СТАРЫЙ ОТВЕТ Я думаю, что дошел до понимания проблемы, стоящей за
Прежде всего, вы не можете использовать преобразование InsertField в любом соединителе источника, потому что значение метки времени для сообщения назначеново время записи в тему, так что это не то, что соединитель уже может знать,
для соединителя JDBC есть этот билет https://github.com/confluentinc/kafka-connect-jdbc/issues/311
и в соединителе источника sap также не работает.
Второй соединитель BigQuery содержит ошибку, которая не позволяет использовать InsertField для добавления метки времени к каждой таблице, как указано здесь
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
Так что если вы хотите использовать bigquery в качествевывести единственное решение прямо сейчас - вручную отредактировать схему каждой таблицы, чтобы добавить столбец перед загрузкой коннектора cink
ОБНОВЛЕНИЕ 2018-12-03 Окончательное решение всегда добавлять сообщениеотметка времени в соединителе SINK.Давайте предположим, что вы хотите добавить временную метку в КАЖДУЮ таблицу соединителя приемника
в вашем ИСТОЧНИКЕ СОЕДИНИТЕЛЬ поместить эту конфигурацию
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
Это добавит имя столбца с именем "fieldtime«к каждой исходной таблице
в вашем SINK CONNECTOR поместите эти настройки
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
, это фактически удалит время поля столбца и добавит его снова с отметкой времени сообщения
Это решение автоматически добавит столбец с правильным значением без операции добавления