Как использовать Kafka для MySQL, используя Beam в Scala - PullRequest
0 голосов
/ 08 октября 2019

Я хочу использовать данные из потока Kafka и записать их в MySQL, используя Apache Beam. Отображается ошибка Cannot resolve overloaded method 'apply' на .apply(JdbcIO.write[KV[Integer, String]]()...

Вот мой полный код:

def main(cmdlineArgs: Array[String]): Unit = {
    val options = PipelineOptionsFactory.create()

    val pipeline = Pipeline.create(options)

    pipeline
        .apply(KafkaIO.read[Long, String]()
          .withBootstrapServers("kafka-stag.my-domain.com:31090")
          .withTopic("topic-saya")
          .withKeyDeserializer(classOf[LongDeserializer])
          .withValueDeserializer(classOf[StringDeserializer])
          .updateConsumerProperties(
            ImmutableMap.of("auto.offset.reset", "earliest".asInstanceOf[Object])
          )
          .withMaxNumRecords(5)
          .withoutMetadata()
        )
          .apply(MapElements.into(TypeDescriptors.strings()))
          .apply(JdbcIO.write[KV[Integer, String]]()
            .withDataSourceConfiguration(DataSourceConfiguration.create(
              "com.mysql.jdbc.Driver",
              "jdbc:mysql://localhsot:3306/mydb")
            .withUsername("root")
            .withPassword("secret"))
            .withStatement("insert into Person values(?, ?)")
            .withPreparedStatementSetter(
                (element: KV[Integer, String], query: PreparedStatement) => {
                  query.setInt(1, element.getKey())
                  query.setString(2, element.getValue())
              })
          )


    pipeline.run().waitUntilFinish()
}

Что мне сделать, чтобы решить эту проблему?

1 Ответ

2 голосов
/ 08 октября 2019

Я считаю полезным разбивать длинные цепочки методов при отладке scala-кода.

В этом случае у вас есть три apply(...), соединенные вместе:

// Heads up: you'll want to specify java.lang.Long when working
// with Java code and generics... this has bitten me *many* times.

val x: PCollection[KV[java.lang.Long, String]] = pipeline
    .apply(KafkaIO.read[java.lang.Long, String]()
        .withThisAndThatConfig(...))

// Probably not what you're looking for...
// MapElements requires via() to modify the element.

val y: PCollection[String] = x
    .apply(MapElements.into(TypeDescriptors.strings()))

// Error happens below, the JdbcIO.write is expecting to be applied
// to a PCollection[KV[Integer, String]], which is not what it's getting.

val z: PDone = y
    .apply(JdbcIO.write[KV[Integer, String]]()
            .withThisAndThatConfig(...))

Если я понимаючто вы ищете, вы, вероятно, захотите поближе взглянуть на середину apply и убедиться, что она излучает правильные элементы для JdbcIO. С этими двумя языками нужно потрудиться, но, скорее всего, вы получите что-то вроде:

val keystoInt: SerializableFunction[KV[java.lang.Long, String], KV[Integer, String]] =
    (input: KV[java.lang.Long, String]) => KV.of(input.getKey.intValue, input.getValue)
val y: PCollection[KV[Integer, String]] = x
    .apply(MapElements
        .into(TypeDescriptors.kvs(TypeDescriptors.integers, TypeDescriptors.strings))
        .via(keysToInt))
...