Я сталкиваюсь со странным поведением в конфлюентном коннекторе Kafka JDB C с использованием Bulk mode
Как воспроизвести:
Создайте исходный соединитель и загрузите его, например, в 18:00
Создайте соединитель приемника с помощью transforms.InsertField.timestamp.field или любой другой системы, для которой вы предпочитаете регистрировать время записи
Проблема:
соединитель за первую неделю запишет строки в 18:00, через 1 неделю он начнет писать в 19:00, через 2 недели 20:00 и т. д.
Пример JDB C Соединитель
{
"name": "sap-bulk",
"config": {
"name": "sap-bulk",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "10",
"topic.prefix": "bulk_",
"table.whitelist": "----"
"connection.url": "jdbc:sap://----/",
"connection.user": "-----",
"connection.password": "----------",
"retention.ms":"604800000",
"mode":"bulk",
"table.types":"VIEW",
"poll.interval.ms":"86400000",
"validate.non.null":"false",
}
}
Пример раковины
name=bgconnect-bulk
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=5
#topics=kcbq-quickstart
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
#bufferSize=10000
#maxWriteSize=10000
tableWriteWait=900000
max.poll.records=1000000
########################################### Fill me in! ###########################################
project=--------
topics=-----
datasets=.*=mydata
keyfile=/opt/kafka/Gaccess.json
group.id=-----------
transforms=DropField,InsertField
transforms.DropField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.DropField.blacklist=fieldtime
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
Кто-нибудь сталкивался с такой же проблемой?