Раковина Kafka Connect S3 - как использовать временную метку из самого сообщения [экстрактор временной метки] - PullRequest
0 голосов
/ 28 марта 2019

Я боролся с проблемой использования kafka connect и приемника S3.

Первая структура:

{
   Partition: number
   Offset: number
   Key: string
   Message: json string
   Timestamp: timestamp
}

Обычно при публикации в Kafka метка времени должна быть установлена ​​производителем. К сожалению, бывают случаи, когда этого не произошло. Это означает, что временная метка иногда может быть null

Чтобы извлечь эту метку времени, для соединителя было установлено следующее значение: "timestamp.extractor":"Record".

Теперь всегда очевидно, что само поле Message также всегда содержит отметку времени.

Message:

{
   timestamp: "2019-04-02T06:27:02.667Z"
   metadata: {
     creationTimestamp: "1554186422667"
   }
}

Однако вопрос в том, что сейчас я хотел бы использовать это поле для timestamp.extractor

Я думал, что этого будет достаточно, но, похоже, это не работает:

"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",

Это также приводит к NullPointer.

Любые идеи относительно того, как использовать временную метку из самой полезной нагрузки сообщения kafka вместо поля временной метки по умолчанию, которое установлено для kafka v0.10 +

EDIT: Полная конфигурация:

{ "name": "<name>",
  "config": {
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"4",
    "topics":"<topic>",
    "flush.size":"100",
    "s3.bucket.name":"<bucket name>",
    "s3.region": "<region>",
    "s3.part.size":"<partition size>",
    "rotate.schedule.interval.ms":"86400000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
    "locale":"ENGLISH",
    "timezone":"UTC",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "timestamp.extractor":"RecordField",
    "timestamp.field":"message.timestamp",
    "max.poll.interval.ms": "600000",
    "request.timeout.ms": "610000",
    "heartbeat.interval.ms": "6000",
    "session.timeout.ms": "20000",
    "s3.acl.canned":"bucket-owner-full-control"
  }
}

РЕДАКТИРОВАТЬ 2: Структура полезного сообщения Kafka:

{
  "reference": "",
  "clientId": "",
  "gid": "",
  "timestamp": "2019-03-19T15:27:55.526Z",
}

РЕДАКТИРОВАТЬ 3:

{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}

Итак, я попытался выполнить преобразование объекта, но, похоже, я снова застрял в этом. Шаблон кажется неверным. Оглядываясь в Интернете, кажется, что это действительный SimpleDatePattern. Кажется, жалуются на 'T'. Также обновлена ​​схема сообщения.

Ответы [ 2 ]

0 голосов
/ 29 марта 2019

Если данные являются строкой, то Connect попытается проанализировать как миллисекунды - исходный код здесь .

В любом случае message.timestamp предполагает, что данные выглядят как { "message" : { "timestamp": ... } }, поэтому просто timestamp будет правильным. А наличие вложенных полей в любом случае было невозможно, поэтому вы можете уточнить, какая у вас версия Connect.

Я не совсем уверен, как вы получите instanceof Date для оценки истинности при использовании JSON Converter, и даже если вы установили schema.enable = true, то и в коде вы можете видеть, что есть только условия для схемы типы чисел и строк, но все равно предполагается, что это миллисекунды.

Вы можете попробовать использовать преобразование TimestampConverter для преобразования строки даты.

0 голосов
/ 28 марта 2019

На основании схемы, которой вы поделились, вы должны установить:

    "timestamp.extractor":"RecordField",
    "timestamp.field":"timestamp",

т.е. нет message префикса к имени поля метки времени.

...