Получая вдохновение от ответа @ Стига и от этого ответа , я сериализовал объект всякий раз, когда передавал его между болтами вместо моих объектов. Так что теперь я посылаю массив байтов, как это в моих болтах:
val messages = input.asInstanceOf[TupleImpl].get("Request").asInstanceOf[Array[Byte]].getObj[List[myObject]]
val objMapper = new ObjectMapper()
messages.foreach(message => collector.emit(new Values(objMapper.writeValueAsBytes(message))))
Редактировать 1:
Другой возможный способ исправить это выглядит (не пробовал, я решил посылать байты) - написать класс сериализатора для объекта, который вы передаете от одного болта к другому, как описано здесь . Ниже приведен пример сериализатора по этой ссылке:
public class StockAvroSerializer extends Serializer<Stock> {
private static final Logger LOG = LoggerFactory.getLogger(StockAvroSerializer.class);
private Schema SCHEMA = Stock.getClassSchema();
public void write(Kryo kryo, Output output, Stock object) {
DatumWriter<Stock> writer = new SpecificDatumWriter<>(SCHEMA);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
writer.write(object, encoder);
encoder.flush();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
IOUtils.closeQuietly(out);
byte[] outBytes = out.toByteArray();
output.writeInt(outBytes.length, true);
output.write(outBytes);
}
public Stock read(Kryo kryo, Input input, Class<Stock> type) {
byte[] value = input.getBuffer();
SpecificDatumReader<Stock> reader = new SpecificDatumReader<>(SCHEMA);
Stock record = null;
try {
record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
} catch (IOException e) {
LOG.error(e.toString(), e);
}
return record;
}
}
Редактировать 2:
Здесь я нашел, почему ObjectNode не может быть сериализован:
JsonNode
не знает, как сериализовать себя, используя только информацию, доступную при сериализации: нет никакого ObjectMapper или JsonGenerator для использования; последний является компонентом, который он должен сам сериализовать (и содержимое, если оно есть). Он не может и должен пытаться создать экземпляр (как они должны быть настроены?); и статические синглтоны, как правило, вызывают проблемы в больших системах (одна часть пытается их настроить, а другая - иначе)
Но это довольно старое сообщение, в новой версии, я думаю, должен быть какой-то механизм, чтобы сделать его сериализуемым.