У меня следующая топология, я не могу создать кодер для последнего шага, который KV[K, CoCombinedResult]
, JLong
и JDouble
относится к java.lang.Long
и java.lang.Double
соответственно.Я не уверен, с чего начать отладку этого.При прохождении информация о типе, по-видимому, отсутствует для комбинированного результата.
class SF[T] extends SimpleFunction[T, T] {
override def apply(t: T): T = t
}
class TestPCollectionAssertion extends JUnitSuite {
@(Rule@getter)
val pipeline: TestPipeline = TestPipeline.create()
@Test
def testCombiner(): Unit = {
val tag2 = new TupleTag[JLong](){}
val tag3 = new TupleTag[JDouble](){}
val combiner: GlobalCombineFn[JLong, _, CombineFns.CoCombineResult] = CombineFns
.compose()
.`with`(new SF[JLong](), Count.combineFn[JLong](), tag2)
.`with`(new SF[JLong](), Mean.of[JLong](), tag3)
val result = pipeline
.apply(Create.of(KV.of("Hello", 1L.asInstanceOf[JLong]), KV.of("World", 1L.asInstanceOf[JLong])))
.apply(Combine.perKey[String, JLong, CoCombineResult](combiner))
.apply(ParDo.of(new ExtractTags(tag2, tag3)))
}
}
Обновление: явная установка кодера для шагов объединения, следующих за исключением Cannot infer coder for type parameter OutputT
.setCoder(KvCoder.of(
StringUtf8Coder.of(),
combiner.getDefaultOutputCoder(pipeline.getCoderRegistry, VarLongCoder.of())))```