Beam / Detaflow: невозможно вернуть кодер по умолчанию для Combine.perKey (составлено) - PullRequest
0 голосов
/ 05 февраля 2019

У меня следующая топология, я не могу создать кодер для последнего шага, который KV[K, CoCombinedResult], JLong и JDouble относится к java.lang.Long и java.lang.Double соответственно.Я не уверен, с чего начать отладку этого.При прохождении информация о типе, по-видимому, отсутствует для комбинированного результата.

class SF[T] extends SimpleFunction[T, T] {
  override def apply(t: T): T = t
}
class TestPCollectionAssertion extends JUnitSuite {
  @(Rule@getter)
  val pipeline: TestPipeline = TestPipeline.create()

  @Test
  def testCombiner(): Unit = {
    val tag2 = new TupleTag[JLong](){}
    val tag3 = new TupleTag[JDouble](){}

    val combiner: GlobalCombineFn[JLong, _, CombineFns.CoCombineResult] = CombineFns
      .compose()
      .`with`(new SF[JLong](), Count.combineFn[JLong](), tag2)
      .`with`(new SF[JLong](), Mean.of[JLong](), tag3)

    val result = pipeline
      .apply(Create.of(KV.of("Hello", 1L.asInstanceOf[JLong]), KV.of("World", 1L.asInstanceOf[JLong])))
      .apply(Combine.perKey[String, JLong, CoCombineResult](combiner))
      .apply(ParDo.of(new ExtractTags(tag2, tag3)))
  }
}

Обновление: явная установка кодера для шагов объединения, следующих за исключением Cannot infer coder for type parameter OutputT

.setCoder(KvCoder.of(
        StringUtf8Coder.of(),
        combiner.getDefaultOutputCoder(pipeline.getCoderRegistry, VarLongCoder.of())))```
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...