Используйте ByteArrayFormat с TimeBasedPartitioner, который извлекается с использованием RecordField - PullRequest
1 голос
/ 14 мая 2019

Я пытаюсь использовать TimeBasedPartitioner, который извлекает, используя RecordField со следующей конфигурацией:

{
    "name": "s3-sink",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "10",
    "topics": "topics1.topics2",
    "s3.region": "us-east-1",
    "s3.bucket.name": "bucket",
    "s3.part.size": "5242880",
    "s3.compression.type": "gzip",
    "timezone": "UTC",
    "rotate.schedule.interval.ms": "900000",
    "flush.size": "1000000",
    "schema.compatibility": "NONE",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
    "partition.duration.ms": "900000",
    "locale": "en",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "time",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter.schemas.enabled": false,
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter.schemas.enabled": false,
    "interal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enabled": false,
    "interal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter.schemas.enabled": false,
}

Я продолжаю получать следующую ошибку, и я не нахожу много, что объясняет, что происходитна.Я посмотрел на исходный код, и оказалось, что запись не относится к типу Struct или Map, поэтому мне интересно, есть ли проблема с использованием ByteArrayFormat?

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.confluent.connect.storage.errors.PartitionException: Error encoding partition.
    at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:294)
    at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:199)
    at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:176)
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)

Я смогзапишите, используя стандартный разделитель.

...