Я пытаюсь написать преобразователь для нашего экземпляра Kafka Connect, который берет некоторые значения из ключа (строки) и добавляет его к значению (только для объекта json, без схемы).Сам приемник работает нормально, но когда я пытаюсь добавить преобразование, происходит сбой со следующей ошибкой:
[2019-06-12 14:10:20,399] ERROR WorkerSinkTask{id=elasticsearch-sink-dpi-vehicle-topic-journey-dpi-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
org.apache.kafka.connect.errors.DataException: Java class class com.google.gson.JsonObject does not have corresponding schema type.
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:283)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:268)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Кажется, что ожидается ожидание схемы в записи, даже если она настроена на игнорирование?Не применяется ли свойство schema.ignore к пользовательским преобразователям?
Преобразователь выглядит следующим образом:
package no.ruter.nextgen.connect.transform;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.Map;
public class DpiMqtt<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
//Key looks something like this "unibuss/ruter/101025/itxpt/ota/dpi/journey/json"
String key = (String) record.key();
String[] keys = key.split("/");
String operator = keys[0];
String vehicleId = keys[2];
Gson gson = new Gson();
JsonObject recordValue = gson.toJsonTree(record.value()).getAsJsonObject();
recordValue.addProperty("operator", operator);
recordValue.addProperty("vehicleId", vehicleId);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
null,
recordValue,
record.timestamp()
);
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
Конфигурация выглядит следующим образом:
{
"name": "elasticsearch-sink-dpi-vehicle-topic-journey-dpi",
"config":
{
"name": "elasticsearch-sink-dpi-vehicle-topic-journey-dpi",
"topics": "data.vehicle-topic.journey-dpi",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "{{{k8s_elasticsearch}}}",
"connection.username": "{{{k8s_elasticsearch_user}}}",
"connection.password": "{{{k8s_elasticsearch_pass}}}",
"tasks.max": "3",
"key.ignore": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"type.name": "_doc",
"transforms": "topic, DpiMqtt",
"transforms.topic.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.topic.timestamp.format": "yyyy.MM.dd",
"transforms.topic.topic.format": "dpi-journey-${timestamp}",
"transforms.DpiMqtt.type": "no.ruter.nextgen.connect.transform.DpiMqtt",
"read.timeout.ms": "10000",
"flush.timeout.ms": "60000",
"connection.timeout.ms": "10000",
"behavior.on.malformed.documents": "warn",
"max.retries": "20"
}
}
Похожепроблема в строке «return record.newRecord» в преобразователе.Однако в идеале я просто хочу сказать Кафке: «Это объект Json, не пытайтесь сопоставить его со схемой» Как вы можете использовать JsonConverter.