Apache Beam / Dataflow: KVCoder портит Inputstream для декодирования - PullRequest
0 голосов
/ 04 февраля 2019

У меня есть пользовательские объекты: CustomKey, CustomValue, которые я предоставил кодеру через Avro: CustomKeyCoder, CustomValueCoder.

Поскольку мне нужно сгруппировать по KV [CustomKey, CustomValue], я зарегистрировал KVCoder.of(new CustomKeyCoder, new CustomValueCoder).Пользовательские кодеры переносят входящий / исходящий поток в поток входных / выходных данных и используют Avro Datum Writer / Reader.

У меня проблема в декодировании KVCoder, когда мы пытаемся декодировать часть значения KVЯ получаю Forbidden IOException when reading from InputStream.Как уже отмечалось, ключевая часть декодирования работает правильно, ошибка выдается, когда входной поток передается в значение декодирования.KVCoder повторно использует один и тот же поток ввода для ключа и значения. Я предполагаю, что декодирование ключа считывает весь поток.Почему это происходит?Является ли использование Avro проблемой?

Вот код для демонстрации выше:

  //Coder
  override def decode(inputStream: InputStream): CustomValue = {
    val dataInputStream = new DataInputStream(inputStream)
    val id = dataInputStream.readShort
    underlying.decode(dataInputStream)
  }

 //Underlying
  override def decode(inputStream: InputStream): CustomValue = {
    val decoder = DecoderFactory.get().binaryDecoder(inputStream, null)
    val record = datumReader.read(null, decoder)
    CustomValue.decode(record)
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...