как назначить и проверить время события сообщения в Apache Beam - PullRequest
0 голосов
/ 09 октября 2018

Мой источник - KafkaIO.read(), и теперь я хочу использовать ParDo, чтобы декодировать сообщения, полученные от kafka, и использовать одно поле сообщения в качестве времени события этого сообщения.Как мне это сделать?Я не нашел ни одного примера, как это сделать.

1 Ответ

0 голосов
/ 25 января 2019

Сначала Вам нужно реализовать CustomTimestampPolicy, расширив TimestampPolicy<KeyT,ValueT>

Например:

public class CustomFieldTimePolicy extends TimestampPolicy<String, Foo> {


protected Instant currentWatermark;

public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
    currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}


@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Foo> record) {
    currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return currentWatermark;
}

@Override
public Instant getWatermark(PartitionContext ctx) {
    return currentWatermark;
}

}

Затем вам нужно передать свой пользовательский TimestampPolicy, когдаВы настраиваете свой источник KafkaIO, используя функциональный интерфейс TimestampPolicyFactory

KafkaIO.<String, Foo>read().withBootstrapServers("http://localhost:9092")
                .withTopic("foo")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Foo.class)) //if you use avro
                .withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
                .updateConsumerProperties(kafkaProperties))

Эта строка отвечает за создание новой метки времениPolicy, прохождения связанного раздела и предыдущего контрольного пункта с водяным знаком, см. документацию

withTimestampPolicyFactory(tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
...