Я пытаюсь запечь очень простой конвейер, который читает поток событий из Kafka (KafkaIO.read
) и записывает те же самые события в HDFS, объединяя каждое событие в один час (час читается из поля метки времени событие, а не время обработки).
Нельзя делать предположений относительно отметки времени событий (они могут охватывать несколько дней, даже если они находятся в режиме реального времени в 99% времени), и нет абсолютно никакой информации о порядке событий. Моя первая попытка - создать конвейер, работающий за время обработки .
Мой конвейер выглядит так:
val kafkaReader = KafkaIO.read[String, String]()
.withBootstrapServers(options.getKafkaBootstrapServers)
.withTopic(options.getKafkaInputTopic)
.withKeyDeserializer(classOf[StringDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.updateConsumerProperties(
ImmutableMap.of("receive.buffer.bytes", Integer.valueOf(16 * 1024 * 1024))
)
.commitOffsetsInFinalize()
.withoutMetadata()
val keyed = p.apply(kafkaReader)
.apply(Values.create[String]())
.apply(new WindowedByWatermark(options.getBatchSize))
.apply(ParDo.of[String, CustomEvent](new CustomEvent))
val outfolder = FileSystems.matchNewResource(options.getHdfsOutputPath, true)
keyed.apply(
"write to HDFS",
FileIO.writeDynamic[Integer, CustomEvent]()
.by(new SerializableFunction[CustomEvent, Integer] {
override def apply(input: CustomEvent): Integer = {
new Instant(event.eventTime * 1000L).toDateTime.withMinuteOfHour(0).withSecondOfMinute(0)
(eventZeroHoured.getMillis / 1000).toInt
}
})
.via(Contextful.fn(new SerializableFunction[CustomEvent, String] {
override def apply(input: CustomEvent): String = {
convertEventToStr(input)
}
}), TextIO.sink())
.withNaming(new SerializableFunction[Integer, FileNaming] {
override def apply(bucket: Integer): FileNaming = {
new BucketedFileNaming(outfolder, bucket, withTiming = true)
}
})
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getHdfsOutputPath)
.withTempDirectory("hdfs://tlap/tmp/gulptmp")
.withNumShards(1)
.withCompression(Compression.GZIP)
)
А это мой WindowedByWatermark :
class WindowedByWatermark(bucketSize: Int = 5000000) extends PTransform[PCollection[String], PCollection[String]] {
val window: Window[String] = Window
.into[String](FixedWindows.of(Duration.standardMinutes(10)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(bucketSize))
)
.withAllowedLateness(Duration.standardMinutes(30))
.discardingFiredPanes()
override def expand(input: PCollection[String]): PCollection[String] = {
input.apply("window", window)
}
}
Трубопровод работает безупречно, но страдает от невероятно высокого противодавления из-за фазы записи ( groupby , вызванный writeDynamic
). Большинство событий происходит в режиме реального времени, следовательно, они принадлежат одному и тому же часу. Я также попытался создать данные, используя часы и минуты, без особой помощи.
После нескольких дней боли я решил повторить то же самое с Flink, используя bucketingSink
, и производительность отличная.
val stream = env
.addSource(new FlinkKafkaConsumer011[String](options.kafkaInputTopic, new SimpleStringSchema(), properties))
.addSink(bucketingSink(options.hdfsOutputPath, options.batchSize))
Согласно моему анализу (даже с использованием JMX), потоки в Beam ожидают во время фазы записи в HDFS (и это приводит к тому, что конвейер приостанавливает получение данных из Kafka).
Поэтому у меня есть следующие вопросы:
- Можно ли оттолкнуть ведро, как это делает
bucketingSink
в Beam?
- Есть ли более разумный способ добиться того же в Beam?