Пользовательский Transfomer Kafka Connect to Elastic Search не может найти схему - PullRequest
0 голосов
/ 12 июня 2019

Я пытаюсь написать преобразователь для нашего экземпляра Kafka Connect, который берет некоторые значения из ключа (строки) и добавляет его к значению (только для объекта json, без схемы).Сам приемник работает нормально, но когда я пытаюсь добавить преобразование, происходит сбой со следующей ошибкой:

[2019-06-12 14:10:20,399] ERROR WorkerSinkTask{id=elasticsearch-sink-dpi-vehicle-topic-journey-dpi-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
org.apache.kafka.connect.errors.DataException: Java class class com.google.gson.JsonObject does not have corresponding schema type.
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
    at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
    at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
    at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:283)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:268)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Кажется, что ожидается ожидание схемы в записи, даже если она настроена на игнорирование?Не применяется ли свойство schema.ignore к пользовательским преобразователям?

Преобразователь выглядит следующим образом:

package no.ruter.nextgen.connect.transform;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;

public class DpiMqtt<R extends ConnectRecord<R>> implements Transformation<R> {

    @Override
    public R apply(R record) {
        //Key looks something like this "unibuss/ruter/101025/itxpt/ota/dpi/journey/json"
        String key = (String) record.key();
        String[] keys = key.split("/");
        String operator = keys[0];
        String vehicleId = keys[2];
        Gson gson = new Gson();
        JsonObject recordValue = gson.toJsonTree(record.value()).getAsJsonObject();

        recordValue.addProperty("operator", operator);
        recordValue.addProperty("vehicleId", vehicleId);

        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            null,
            recordValue,
            record.timestamp()
        );
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef();
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

Конфигурация выглядит следующим образом:

{
  "name": "elasticsearch-sink-dpi-vehicle-topic-journey-dpi",
  "config":
  {
    "name": "elasticsearch-sink-dpi-vehicle-topic-journey-dpi",
    "topics": "data.vehicle-topic.journey-dpi",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "{{{k8s_elasticsearch}}}",
    "connection.username": "{{{k8s_elasticsearch_user}}}",
    "connection.password": "{{{k8s_elasticsearch_pass}}}",
    "tasks.max": "3",
    "key.ignore": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "schema.ignore": "true",
    "type.name": "_doc",
    "transforms": "topic, DpiMqtt",
    "transforms.topic.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "transforms.topic.timestamp.format": "yyyy.MM.dd",
    "transforms.topic.topic.format": "dpi-journey-${timestamp}",
    "transforms.DpiMqtt.type": "no.ruter.nextgen.connect.transform.DpiMqtt",
    "read.timeout.ms": "10000",
    "flush.timeout.ms": "60000",
    "connection.timeout.ms": "10000",
    "behavior.on.malformed.documents": "warn",
    "max.retries": "20"
  }
}

Похожепроблема в строке «return record.newRecord» в преобразователе.Однако в идеале я просто хочу сказать Кафке: «Это объект Json, не пытайтесь сопоставить его со схемой» Как вы можете использовать JsonConverter.

...