У меня есть пользовательские объекты: CustomKey
, CustomValue
, которые я предоставил кодеру через Avro: CustomKeyCoder
, CustomValueCoder
.
Поскольку мне нужно сгруппировать по KV [CustomKey, CustomValue], я зарегистрировал KVCoder.of(new CustomKeyCoder, new CustomValueCoder)
.Пользовательские кодеры переносят входящий / исходящий поток в поток входных / выходных данных и используют Avro Datum Writer / Reader.
У меня проблема в декодировании KVCoder, когда мы пытаемся декодировать часть значения KV
Я получаю Forbidden IOException when reading from InputStream
.Как уже отмечалось, ключевая часть декодирования работает правильно, ошибка выдается, когда входной поток передается в значение декодирования.KVCoder повторно использует один и тот же поток ввода для ключа и значения. Я предполагаю, что декодирование ключа считывает весь поток.Почему это происходит?Является ли использование Avro проблемой?
Вот код для демонстрации выше:
//Coder
override def decode(inputStream: InputStream): CustomValue = {
val dataInputStream = new DataInputStream(inputStream)
val id = dataInputStream.readShort
underlying.decode(dataInputStream)
}
//Underlying
override def decode(inputStream: InputStream): CustomValue = {
val decoder = DecoderFactory.get().binaryDecoder(inputStream, null)
val record = datumReader.read(null, decoder)
CustomValue.decode(record)
}