Kafka Connect останавливается после 300 тыс. Записей - PullRequest
0 голосов
/ 06 июня 2018

Я пытаюсь утопить мой стол mysql вasticsearch.В моей таблице более миллиона записей.Проблема в том, что мойasticsearch больше не получает записи после того, как вставлено 300 тысяч записей.Я знаю, что когда я запускал его впервые, он запускал все записи.Это когда я попытался сделать это снова после удаления индекса ES, это произошло.Я попытался сбросить поле update_ts на новую метку времени.Я пробовал значение смещения в раковине.Кажется, ничего не работает.

Вот мой исходный файл

{
        "name": "items3",
        "config": {
                "_comment": "The JDBC connector class. Don't change this if you want to use the JDBC Source.",
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

                "_comment": "How to serialise the value of keys - here use the Confluent Avro serialiser. Note that the JDBC Source Connector always returns null for the key ",
                "key.converter": "io.confluent.connect.avro.AvroConverter",

                "_comment": "Since we're using Avro serialisation, we need to specify the Confluent schema registry at which the created schema is to be stored. NB Schema Registry and Avro serialiser are both part of Confluent Open Source.",
                "key.converter.schema.registry.url": "http://localhost:8081",

                "_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
                "value.converter": "io.confluent.connect.avro.AvroConverter",
                "value.converter.schema.registry.url": "http://localhost:8081",


                "_comment": " --- JDBC-specific configuration below here  --- ",
                "_comment": "JDBC connection URL. This will vary by RDBMS. Consult your manufacturer's handbook for more information",
                "connection.url": "jdbc:mysql://localhost:3306/db?user=user&password=password",

                "_comment": "Which table(s) to include",
                "table.whitelist": "items",

                "_comment": "Pull all rows based on an timestamp column. You can also do bulk or incrementing column-based extracts. For more information, see http://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html#mode",
                "mode": "timestamp+incrementing",
  "incrementing.column.name": "id",
  "timestamp.column.name": "update_ts",


                "_comment": "If the column is not defined as NOT NULL, tell the connector to ignore this  ",
                "validate.non.null": "true",

                "_comment": "The Kafka topic will be made up of this prefix, plus the table name  ",
                "topic.prefix": "kafka-",
                "auto.offset.reset" : "earliest"
        }
}

А вот мой приемник

{
  "name": "items-sink",
  "config": {
    "_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",


    "_comment": "--- Elasticsearch-specific config ---",
    "_comment": "Elasticsearch server address",
    "connection.url": "http://localhost:9200",
     "_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist  ",
    "type.name": "items",

    "_comment": "Which topic to stream data from into Elasticsearch",
    "topics": "kafka-items",
        "auto.offset.reset" : "earliest",
    "_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source)  you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
    "key.ignore": "true"
  }
}

, как вы видите, я пытаюсь выполнить auto.offset.reset как можно раньше, поэтомуесли он как-то отслеживает мои записи, он начнется заново, но все напрасно.

1 Ответ

0 голосов
/ 09 декабря 2018

"auto.offset.reset" : "earliest" можно использовать только внутри файла connect-distributed.properties, но не в конфигурациях коннектора JSON

И в этом файле, поскольку это пользовательская конфигурация, он называется consumer.auto.offset.reset.

* 1007.* Кроме того, группа потребителей сопоставляется с полем name конфигурации соединителя, поэтому, если это не будет изменено, вы продолжите потреблять с того места, где предыдущий с тем же именем был прерван, до сброса смещений группы или доимя измененоПо умолчанию имя группы connect-${connector_name}
...