Я боролся с проблемой использования kafka connect и приемника S3.
Первая структура:
{
Partition: number
Offset: number
Key: string
Message: json string
Timestamp: timestamp
}
Обычно при публикации в Kafka метка времени должна быть установлена производителем. К сожалению, бывают случаи, когда этого не произошло. Это означает, что временная метка иногда может быть null
Чтобы извлечь эту метку времени, для соединителя было установлено следующее значение:
"timestamp.extractor":"Record"
.
Теперь всегда очевидно, что само поле Message
также всегда содержит отметку времени.
Message
:
{
timestamp: "2019-04-02T06:27:02.667Z"
metadata: {
creationTimestamp: "1554186422667"
}
}
Однако вопрос в том, что сейчас я хотел бы использовать это поле для timestamp.extractor
Я думал, что этого будет достаточно, но, похоже, это не работает:
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
Это также приводит к NullPointer.
Любые идеи относительно того, как использовать временную метку из самой полезной нагрузки сообщения kafka вместо поля временной метки по умолчанию, которое установлено для kafka v0.10 +
EDIT:
Полная конфигурация:
{ "name": "<name>",
"config": {
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"4",
"topics":"<topic>",
"flush.size":"100",
"s3.bucket.name":"<bucket name>",
"s3.region": "<region>",
"s3.part.size":"<partition size>",
"rotate.schedule.interval.ms":"86400000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"locale":"ENGLISH",
"timezone":"UTC",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
"max.poll.interval.ms": "600000",
"request.timeout.ms": "610000",
"heartbeat.interval.ms": "6000",
"session.timeout.ms": "20000",
"s3.acl.canned":"bucket-owner-full-control"
}
}
РЕДАКТИРОВАТЬ 2:
Структура полезного сообщения Kafka:
{
"reference": "",
"clientId": "",
"gid": "",
"timestamp": "2019-03-19T15:27:55.526Z",
}
РЕДАКТИРОВАТЬ 3:
{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}
Итак, я попытался выполнить преобразование объекта, но, похоже, я снова застрял в этом. Шаблон кажется неверным. Оглядываясь в Интернете, кажется, что это действительный SimpleDatePattern. Кажется, жалуются на 'T'
. Также обновлена схема сообщения.