Apache Beam: ошибка при назначении времени события с использованием метки времени - PullRequest
0 голосов
/ 16 января 2019

У меня есть неограниченный поток Кафки, отправляющий данные со следующими полями

{"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"}

Я читаю поток, используя apache beam sdk для kafka

import org.apache.beam.sdk.io.kafka.KafkaIO;
pipeline.apply(KafkaIO.<Long, String>read()
                    .withBootstrapServers("kafka:9092")
                    .withTopic("test")
                    .withKeyDeserializer(LongDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) 
                    .updateConsumerProperties(ImmutableMap.of("group.id", "Consumer1"))
                    .commitOffsetsInFinalize()
                    .withoutMetadata()))

Поскольку я хочу использовать окно время события (в моем примере это "ts"), я анализирую входящую строку и назначаю поле "ts" входящего потока данных в качестве метки времени.

PCollection<Temperature> tempCollection = p.apply(new SetupKafka())
                    .apply(ParDo.of(new ReadFromTopic()))
                    .apply("ParseTemperature", ParDo.of(new ParseTemperature()));

tempCollection.apply("AssignTimeStamps", WithTimestamps.of(us -> new Instant(us.getTimestamp())));  

Функция окна и вычисления применяются, как показано ниже:

PCollection<Output> output = tempCollection.apply(Window
                .<Temperature>into(FixedWindows.of(Duration.standardSeconds(30)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
                .withAllowedLateness(Duration.standardDays(1))
                .accumulatingFiredPanes())
                .apply(new ComputeMax());

Я передаю данные во входной поток с задержкой в ​​5 секунд от текущего времени utc, поскольку в практических сценариях метка времени события обычно раньше, чем метка времени обработки.

Я получаю следующую ошибку:

Невозможно вывести с отметкой времени 2019-01-16T11: 15: 45.560Z. Выход временные метки должны быть не ранее временной метки текущего входа (2019-01-16T11: 16: 50.640Z) минус допустимый перекос (0 миллисекунд). Посмотрите DoFn # getAllowedTimestampSkew () Javadoc для деталей об изменении разрешенный перекос.

Если я закомментирую строку для AssignTimeStamps , ошибок нет, но, я думаю, он учитывает время обработки.

Как мне убедиться, что мои вычисления и окна основаны на времени события, а не на времени обработки?

Пожалуйста, предоставьте некоторые сведения о том, как справиться с этим сценарием.

Ответы [ 2 ]

0 голосов
/ 25 января 2019

Чтобы иметь возможность использовать пользовательскую метку времени, сначала необходимо реализовать CustomTimestampPolicy, расширив TimestampPolicy<KeyT,ValueT>

Например:

public class CustomFieldTimePolicy extends TimestampPolicy<String, Foo> {


protected Instant currentWatermark;

public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
    currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}


@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Foo> record) {
    currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return currentWatermark;
}

@Override
public Instant getWatermark(PartitionContext ctx) {
    return currentWatermark;
}

}

Затем вынеобходимо передать свой пользовательский TimestampPolicy, когда вы настраиваете источник KafkaIO с использованием функционального интерфейса TimestampPolicyFactory

KafkaIO.<String, Foo>read().withBootstrapServers("http://localhost:9092")
                .withTopic("foo")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Foo.class)) //if you use avro
                .withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
                .updateConsumerProperties(kafkaProperties))

Эта строка отвечает за создание нового timestampPolicy, прохождение связанного раздела и предыдущий контрольный пункт водяного знака, см. документация

withTimestampPolicyFactory(tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
0 голосов
/ 17 января 2019

У вас была возможность попробовать это с использованием политики отметок времени, извините, я не пробовал это самостоятельно, но я полагаю, что с 2.9.0 вы должны смотреть на использование политики вместе с чтением KafkaIO.

https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-

...