Могу ли я установить / отменить кодировщик по умолчанию в Scio? - PullRequest
0 голосов
/ 10 июня 2019

Я бы хотел последовательно применять пользовательский RicherIndicatorCoder для моего класса дел RicherIndicator. Более того, если я не смогу предоставить новый кодер для Tuples или KVs, содержащий RicherIndicator, тогда я хотел бы получить ошибку времени компиляции или выполнения, а не использовать субоптимальный кодер.

Однако Scio, похоже, не соблюдает аннотацию @DefaultCoder:

@DefaultCoder(classOf[RicherIndicatorCoder]) // Ignored
case class RicherIndicator (
  policy: Policy,
  indicator: Indicator
)

Кроме того, Scio не отдает приоритет пользовательским кодерам, зарегистрированным в CoderRegistry, вместо этого используется собственный кодер по умолчанию:

val registry = sc.pipeline.getCoderRegistry
registry.registerCoderForClass(classOf[RicherIndicator], RicherIndicatorCoder.of) // Not used

Поэтому я должен использовать setCoder(RicherIndicatorCoder.of) везде, где появляется SCollection этого типа, и тщательно прочесывать конвейер на случай, если есть составные типы, которые включают RicherIndicator.

Есть ли способ установить мой пользовательский кодер по умолчанию или отключить возврат к стандартному кодеру на основе Magnolia или Kryo?

1 Ответ

0 голосов
/ 13 июня 2019

Java-аннотация не работает в Scala.Вы можете обернуть свой Beam Coder как неявный Scio Coder следующим образом:

implicit val richIndicateCoder = Coder.beam(RicherIndicatorCoder)

Он должен быть выбран, пока неявный находится в области действия.

...