Beam JdbcIO.readAll, похоже, не возвращает результаты - PullRequest
0 голосов
/ 16 апреля 2020

Я пытаюсь построить конвейеры для событий с Apache Beam. Я хотел прочитать потоковые данные из GCP PubSub и прочитать связанные метаданные из MySQL, используя идентификаторы в событиях, затем объединить эти два потока и записать в мою базу данных clickhouse.

Но JdbcIO.readall ( ), кажется, не возвращает свой поток. Как вы можете видеть на ClickhousePipeline, после применения CoGroupByKey.create() я пытаюсь объединить два PCollection, но userMetaData получается пустым, а ParDo, который прикован сразу после UserMetadataEnricher(), тоже не выполняется ,

В withRowMapper на UserMetadataEnricher я добавил println(), чтобы проверить, работает ли он, и он работал правильно и распечатал результаты из моей базы данных, однако он не возвращает данные в следующий конвейер. .

Я полагаю, что проблема связана с Windowing, я проверил, что он работает, когда я тестировал его без окон. Но PubSubIO - это Unbounded PCollection, поэтому я должен применить окно, чтобы использовать JDBCIO.readall(), верно? Я понятия не имею, чтобы решить эту проблему. Я надеюсь получить ответ в ближайшее время!

MainPipeline

object MainPipeline {
  @JvmStatic
  fun run(options: MainPipelineOptions) {
    val p = Pipeline.create(options)

    val events = p
      .apply(
        "Read DetailViewEvent PubSub",
        PubsubIO.readStrings().fromSubscription(options.inputSubscription)
      )
      .apply(
        "Extract messages",
        ParseJsons.of(FoodDetailViewEvent::class.java)
          .exceptionsInto(
            TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
          )
          .exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
      )

    val validEvents =
      events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
    val invalidEvents = events.failures()

    invalidEvents.apply(FailurePipeline(options))
    validEvents.apply(ClickhousePipeline(options))

    p.run().waitUntilFinish()
  }

  @JvmStatic
  fun main(args: Array<String>) {
    val options = PipelineOptionsFactory
      .fromArgs(*args)
      .withValidation()
      .`as`(MainPipelineOptions::class.java)

    run(options)
  }
}

ClickhousePipeline

class ClickhousePipeline(private val options: MainPipelineOptions) :
  PTransform<PCollection<DetailViewEvent>, PDone>() {

  override fun expand(events: PCollection<DetailViewEvent>): PDone {
    val windowedEvents = events
      .apply(
        "Window", Window
          .into<DetailViewEvent>(GlobalWindows())
          .triggering(
            Repeatedly
              .forever(
                AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(5))
              )
          )
          .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
      )

    val userIdDetailViewEvents = windowedEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
          override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
            return KV.of(input.userInfo.userId, input)
          }
        })
      )

    val userMetaData = userIdDetailViewEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<KV<String, DetailViewEvent>, String>() {
          override fun apply(input: KV<String, DetailViewEvent>): String {
            return input.key!!
          }
        })
      )
      .apply(
        UserMetadataEnricher(options)
      )
      .apply(
        ParDo.of(
          object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
            @ProcessElement
            fun processElement(
              @Element data: UserMetadata,
              out: OutputReceiver<KV<String, UserMetadata>>
            ) {
              println("User:: ${data}") // Not printed!!
              out.output(KV.of(data.userId, data))
            }
          })
      )

    val sourceTag = object : TupleTag<DetailViewEvent>() {}
    val userMetadataTag = object : TupleTag<UserMetadata>() {}

    val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
      KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
        .and(userMetadataTag, userMetaData)
        .apply(CoGroupByKey.create())

    val enrichedData = joinedPipeline.apply(
      ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
        @ProcessElement
        fun processElement(
          @Element data: KV<String, CoGbkResult>,
          out: OutputReceiver<ClickHouseModel>
        ) {

          val name = data.key
          val source = data.value.getAll(sourceTag)
          val userMetadataSource = data.value.getAll(userMetadataTag)

          println("==========================")
          for (metadata in userMetadataSource.iterator()) {
            println("Metadata:: $metadata") // This is always empty
          }

          for (event in source.iterator()) {
            println("Event:: $event")
          }
          println("==========================")

          val sourceEvent = source.iterator().next()
          if (userMetadataSource.iterator().hasNext()) {
            val userMetadataEvent = userMetadataSource.iterator().next()
            out.output(
              ClickHouseModel(
                eventType = sourceEvent.eventType,
                userMetadata = userMetadataEvent
              )
            )

          }
        }
      })
    )

    val clickhouseData = enrichedData.apply(
      ParDo.of(object : DoFn<ClickHouseModel, Row>() {
        @ProcessElement
        fun processElement(context: ProcessContext) {
          val data = context.element()
          context.output(
            data.toSchema()
          )
        }
      })
    )

    return clickhouseData
      .setRowSchema(ClickHouseModel.schemaType())
      .apply(
        ClickHouseIO.write(
          "jdbc:clickhouse://127.0.0.1:8123/test?password=example",
          "clickhouse_test"
        )
      )
  }
}

UserMetadataEnricher

class UserMetadataEnricher(private val options: MainPipelineOptions) :
  PTransform<PCollection<String>, PCollection<UserMetadata>>() {

  @Throws(Exception::class)
  override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
    return events
      .apply(
        JdbcIO.readAll<String, UserMetadata>()
          .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create(
              "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
            )
              .withUsername("root")
              .withPassword("example")
          )
          .withQuery("select id,name,gender from user where id = ?")
          .withParameterSetter { id: String, preparedStatement: PreparedStatement ->
            preparedStatement.setString(1, id)
          }
          .withCoder(
            SerializableCoder.of(
              UserMetadata::class.java
            )
          )
          .withRowMapper
          {
            println("RowMapper:: ${it.getString(1)}") // printed!!
            UserMetadata(
              it.getString(1),
              it.getString(2),
              it.getString(3)
            )
          }
      )
  }
}


output

RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================

Обновление 1 (GlobalWindow для FixedWindow)

с использованием AfterProcessing

Я изменил настройки моего окна и добавил печать в SimpleFunction, который назначен для userIdDetailViewEvents.

Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
          .triggering(
            Repeatedly.forever(
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                Duration.standardSeconds(1)
              )
            )
          )
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes()
      )

And It печатает:

userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================

Использование AfterWatermark

        Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
          .triggering(
            Repeatedly.forever(
              AfterWatermark.pastEndOfWindow()
            )
          )
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes()

Вывод

userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01

Я думаю, что использование AfterWatermark является правильным, но он где-то висит ... Я думаю, это JdbcIO

1 Ответ

2 голосов
/ 16 апреля 2020

GlobalWindow никогда не закрывается, поэтому он не подходит для неограниченного источника данных, такого как pubsub.

Я бы рекомендовал использовать FixedWindow(<Some time range>) вместо этого. Вы можете узнать больше о windows здесь https://beam.apache.org/documentation/programming-guide/#windowing

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...