Я пытаюсь построить конвейеры для событий с Apache Beam. Я хотел прочитать потоковые данные из GCP PubSub и прочитать связанные метаданные из MySQL, используя идентификаторы в событиях, затем объединить эти два потока и записать в мою базу данных clickhouse.
Но JdbcIO.readall ( ), кажется, не возвращает свой поток. Как вы можете видеть на ClickhousePipeline
, после применения CoGroupByKey.create()
я пытаюсь объединить два PCollection
, но userMetaData
получается пустым, а ParDo
, который прикован сразу после UserMetadataEnricher()
, тоже не выполняется ,
В withRowMapper
на UserMetadataEnricher
я добавил println()
, чтобы проверить, работает ли он, и он работал правильно и распечатал результаты из моей базы данных, однако он не возвращает данные в следующий конвейер. .
Я полагаю, что проблема связана с Windowing
, я проверил, что он работает, когда я тестировал его без окон. Но PubSubIO - это Unbounded PCollection
, поэтому я должен применить окно, чтобы использовать JDBCIO.readall()
, верно? Я понятия не имею, чтобы решить эту проблему. Я надеюсь получить ответ в ближайшее время!
MainPipeline
object MainPipeline {
@JvmStatic
fun run(options: MainPipelineOptions) {
val p = Pipeline.create(options)
val events = p
.apply(
"Read DetailViewEvent PubSub",
PubsubIO.readStrings().fromSubscription(options.inputSubscription)
)
.apply(
"Extract messages",
ParseJsons.of(FoodDetailViewEvent::class.java)
.exceptionsInto(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
)
.exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
)
val validEvents =
events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
val invalidEvents = events.failures()
invalidEvents.apply(FailurePipeline(options))
validEvents.apply(ClickhousePipeline(options))
p.run().waitUntilFinish()
}
@JvmStatic
fun main(args: Array<String>) {
val options = PipelineOptionsFactory
.fromArgs(*args)
.withValidation()
.`as`(MainPipelineOptions::class.java)
run(options)
}
}
ClickhousePipeline
class ClickhousePipeline(private val options: MainPipelineOptions) :
PTransform<PCollection<DetailViewEvent>, PDone>() {
override fun expand(events: PCollection<DetailViewEvent>): PDone {
val windowedEvents = events
.apply(
"Window", Window
.into<DetailViewEvent>(GlobalWindows())
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
val userIdDetailViewEvents = windowedEvents
.apply(
MapElements.via(object :
SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
return KV.of(input.userInfo.userId, input)
}
})
)
val userMetaData = userIdDetailViewEvents
.apply(
MapElements.via(object :
SimpleFunction<KV<String, DetailViewEvent>, String>() {
override fun apply(input: KV<String, DetailViewEvent>): String {
return input.key!!
}
})
)
.apply(
UserMetadataEnricher(options)
)
.apply(
ParDo.of(
object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
@ProcessElement
fun processElement(
@Element data: UserMetadata,
out: OutputReceiver<KV<String, UserMetadata>>
) {
println("User:: ${data}") // Not printed!!
out.output(KV.of(data.userId, data))
}
})
)
val sourceTag = object : TupleTag<DetailViewEvent>() {}
val userMetadataTag = object : TupleTag<UserMetadata>() {}
val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
.and(userMetadataTag, userMetaData)
.apply(CoGroupByKey.create())
val enrichedData = joinedPipeline.apply(
ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
@ProcessElement
fun processElement(
@Element data: KV<String, CoGbkResult>,
out: OutputReceiver<ClickHouseModel>
) {
val name = data.key
val source = data.value.getAll(sourceTag)
val userMetadataSource = data.value.getAll(userMetadataTag)
println("==========================")
for (metadata in userMetadataSource.iterator()) {
println("Metadata:: $metadata") // This is always empty
}
for (event in source.iterator()) {
println("Event:: $event")
}
println("==========================")
val sourceEvent = source.iterator().next()
if (userMetadataSource.iterator().hasNext()) {
val userMetadataEvent = userMetadataSource.iterator().next()
out.output(
ClickHouseModel(
eventType = sourceEvent.eventType,
userMetadata = userMetadataEvent
)
)
}
}
})
)
val clickhouseData = enrichedData.apply(
ParDo.of(object : DoFn<ClickHouseModel, Row>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val data = context.element()
context.output(
data.toSchema()
)
}
})
)
return clickhouseData
.setRowSchema(ClickHouseModel.schemaType())
.apply(
ClickHouseIO.write(
"jdbc:clickhouse://127.0.0.1:8123/test?password=example",
"clickhouse_test"
)
)
}
}
UserMetadataEnricher
class UserMetadataEnricher(private val options: MainPipelineOptions) :
PTransform<PCollection<String>, PCollection<UserMetadata>>() {
@Throws(Exception::class)
override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
return events
.apply(
JdbcIO.readAll<String, UserMetadata>()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
)
.withUsername("root")
.withPassword("example")
)
.withQuery("select id,name,gender from user where id = ?")
.withParameterSetter { id: String, preparedStatement: PreparedStatement ->
preparedStatement.setString(1, id)
}
.withCoder(
SerializableCoder.of(
UserMetadata::class.java
)
)
.withRowMapper
{
println("RowMapper:: ${it.getString(1)}") // printed!!
UserMetadata(
it.getString(1),
it.getString(2),
it.getString(3)
)
}
)
}
}
output
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
Обновление 1 (GlobalWindow для FixedWindow)
с использованием AfterProcessing
Я изменил настройки моего окна и добавил печать в SimpleFunction
, который назначен для userIdDetailViewEvents.
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(1)
)
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
And It печатает:
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
Использование AfterWatermark
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
Вывод
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
Я думаю, что использование AfterWatermark является правильным, но он где-то висит ... Я думаю, это JdbcIO