Я считаю полезным разбивать длинные цепочки методов при отладке scala-кода.
В этом случае у вас есть три apply(...)
, соединенные вместе:
// Heads up: you'll want to specify java.lang.Long when working
// with Java code and generics... this has bitten me *many* times.
val x: PCollection[KV[java.lang.Long, String]] = pipeline
.apply(KafkaIO.read[java.lang.Long, String]()
.withThisAndThatConfig(...))
// Probably not what you're looking for...
// MapElements requires via() to modify the element.
val y: PCollection[String] = x
.apply(MapElements.into(TypeDescriptors.strings()))
// Error happens below, the JdbcIO.write is expecting to be applied
// to a PCollection[KV[Integer, String]], which is not what it's getting.
val z: PDone = y
.apply(JdbcIO.write[KV[Integer, String]]()
.withThisAndThatConfig(...))
Если я понимаючто вы ищете, вы, вероятно, захотите поближе взглянуть на середину apply
и убедиться, что она излучает правильные элементы для JdbcIO. С этими двумя языками нужно потрудиться, но, скорее всего, вы получите что-то вроде:
val keystoInt: SerializableFunction[KV[java.lang.Long, String], KV[Integer, String]] =
(input: KV[java.lang.Long, String]) => KV.of(input.getKey.intValue, input.getValue)
val y: PCollection[KV[Integer, String]] = x
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers, TypeDescriptors.strings))
.via(keysToInt))