Kafka JDB C соединитель исходного кода с увеличением выборки cron на 1 час каждую неделю - PullRequest
0 голосов
/ 27 марта 2020

Я сталкиваюсь со странным поведением в конфлюентном коннекторе Kafka JDB C с использованием Bulk mode

Как воспроизвести:

Создайте исходный соединитель и загрузите его, например, в 18:00

Создайте соединитель приемника с помощью transforms.InsertField.timestamp.field или любой другой системы, для которой вы предпочитаете регистрировать время записи

Проблема:

соединитель за первую неделю запишет строки в 18:00, через 1 неделю он начнет писать в 19:00, через 2 недели 20:00 и т. д.

Пример JDB C Соединитель

    {
            "name": "sap-bulk",
            "config": {
                "name": "sap-bulk",
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "tasks.max": "10",
                "topic.prefix": "bulk_",
                "table.whitelist": "----"
                "connection.url": "jdbc:sap://----/",
                "connection.user": "-----",
                "connection.password": "----------",
                "retention.ms":"604800000",
                "mode":"bulk",
                "table.types":"VIEW",
                "poll.interval.ms":"86400000",
                "validate.non.null":"false",
            }
    }

Пример раковины

name=bgconnect-bulk
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=5

#topics=kcbq-quickstart
sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

#bufferSize=10000
#maxWriteSize=10000
tableWriteWait=900000

max.poll.records=1000000
########################################### Fill me in! ###########################################
project=--------
topics=-----
datasets=.*=mydata
keyfile=/opt/kafka/Gaccess.json
group.id=-----------

transforms=DropField,InsertField
transforms.DropField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.DropField.blacklist=fieldtime
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value

Кто-нибудь сталкивался с такой же проблемой?

...