У меня есть неограниченный поток Кафки, отправляющий данные со следующими полями
{"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"}
Я читаю поток, используя apache beam sdk для kafka
import org.apache.beam.sdk.io.kafka.KafkaIO;
pipeline.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("kafka:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.updateConsumerProperties(ImmutableMap.of("group.id", "Consumer1"))
.commitOffsetsInFinalize()
.withoutMetadata()))
Поскольку я хочу использовать окно время события (в моем примере это "ts"), я анализирую входящую строку и назначаю поле "ts" входящего потока данных в качестве метки времени.
PCollection<Temperature> tempCollection = p.apply(new SetupKafka())
.apply(ParDo.of(new ReadFromTopic()))
.apply("ParseTemperature", ParDo.of(new ParseTemperature()));
tempCollection.apply("AssignTimeStamps", WithTimestamps.of(us -> new Instant(us.getTimestamp())));
Функция окна и вычисления применяются, как показано ниже:
PCollection<Output> output = tempCollection.apply(Window
.<Temperature>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.standardDays(1))
.accumulatingFiredPanes())
.apply(new ComputeMax());
Я передаю данные во входной поток с задержкой в 5 секунд от текущего времени utc, поскольку в практических сценариях метка времени события обычно раньше, чем метка времени обработки.
Я получаю следующую ошибку:
Невозможно вывести с отметкой времени 2019-01-16T11: 15: 45.560Z. Выход
временные метки должны быть не ранее временной метки текущего входа
(2019-01-16T11: 16: 50.640Z) минус допустимый перекос (0 миллисекунд).
Посмотрите DoFn # getAllowedTimestampSkew () Javadoc для деталей об изменении
разрешенный перекос.
Если я закомментирую строку для AssignTimeStamps , ошибок нет, но, я думаю, он учитывает время обработки.
Как мне убедиться, что мои вычисления и окна основаны на времени события, а не на времени обработки?
Пожалуйста, предоставьте некоторые сведения о том, как справиться с этим сценарием.