Получение ошибки NotSerializableException в топологии шторма - PullRequest
0 голосов
/ 04 мая 2018

Storm Версия: 1.2.1, Версия Java: 8

Я пишу топологию шторма в scala и начал получать следующую ошибку при запуске в режиме кластера. Я смог получить то же самое в режиме LocalCluster с помощью конфигурации: conf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, Boolean.box( true)). Ниже приводится след:

2018-05-05 00:49:59,342 ERROR util [Thread-37-disruptor-executor[6 6]-send-queue] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$consume_loop_STAR_$fn__4492.invoke(disruptor.clj:84) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) ~[storm-core-1.2.1.jar:1.2.1]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$assert_can_serialize.invoke(worker.clj:133) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$mk_transfer_fn$fn__5204.invoke(worker.clj:213) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4882.invoke(executor.clj:314) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
    ... 6 more
Caused by: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_131]
    at org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:38) ~[storm-core-1.2.1.jar:1.2.1]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$assert_can_serialize.invoke(worker.clj:133) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$mk_transfer_fn$fn__5204.invoke(worker.clj:213) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4882.invoke(executor.clj:314) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
    ... 6 more

Как кажется, шторм пытается сериализовать ObjectNode, что не в состоянии сделать и дает NotSerializableException.

Не должно ли ObjectNode быть serializable? Я вижу старое обсуждение этого здесь , но чувствую, что это должно было быть serializable.

Я попытался добавить следующее в конфиге шторма, но не помогло.

conf.registerSerialization(classOf[com.fasterxml.jackson.databind.node.ObjectNode])

Я также попытался добавить conf.setSkipMissingKryoRegistrations(false), но опять не спас.

Что может быть правильным решением для этого?

Ответы [ 2 ]

0 голосов
/ 06 мая 2018

Получая вдохновение от ответа @ Стига и от этого ответа , я сериализовал объект всякий раз, когда передавал его между болтами вместо моих объектов. Так что теперь я посылаю массив байтов, как это в моих болтах:

val messages = input.asInstanceOf[TupleImpl].get("Request").asInstanceOf[Array[Byte]].getObj[List[myObject]]
val objMapper = new ObjectMapper()
messages.foreach(message => collector.emit(new Values(objMapper.writeValueAsBytes(message))))

Редактировать 1:

Другой возможный способ исправить это выглядит (не пробовал, я решил посылать байты) - написать класс сериализатора для объекта, который вы передаете от одного болта к другому, как описано здесь . Ниже приведен пример сериализатора по этой ссылке:

public class StockAvroSerializer extends Serializer<Stock> {

    private static final Logger LOG = LoggerFactory.getLogger(StockAvroSerializer.class);
    private Schema SCHEMA = Stock.getClassSchema();

    public void write(Kryo kryo, Output output, Stock object) {
        DatumWriter<Stock> writer = new SpecificDatumWriter<>(SCHEMA);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        try {
            writer.write(object, encoder);
            encoder.flush();
        } catch (IOException e) {
            LOG.error(e.toString(), e);
        }
        IOUtils.closeQuietly(out);
        byte[] outBytes = out.toByteArray();
        output.writeInt(outBytes.length, true);
        output.write(outBytes);
    }

    public Stock read(Kryo kryo, Input input, Class<Stock> type) {
        byte[] value = input.getBuffer();
        SpecificDatumReader<Stock> reader = new SpecificDatumReader<>(SCHEMA);
        Stock record = null;
        try {
            record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
        } catch (IOException e) {
            LOG.error(e.toString(), e);
        }
        return record;
    }
}

Редактировать 2:

Здесь я нашел, почему ObjectNode не может быть сериализован:

JsonNode не знает, как сериализовать себя, используя только информацию, доступную при сериализации: нет никакого ObjectMapper или JsonGenerator для использования; последний является компонентом, который он должен сам сериализовать (и содержимое, если оно есть). Он не может и должен пытаться создать экземпляр (как они должны быть настроены?); и статические синглтоны, как правило, вызывают проблемы в больших системах (одна часть пытается их настроить, а другая - иначе)

Но это довольно старое сообщение, в новой версии, я думаю, должен быть какой-то механизм, чтобы сделать его сериализуемым.

0 голосов
/ 05 мая 2018

ObjectNode не сериализуем (не реализует интерфейс Serializable).

conf.setSkipMissingKryoRegistrations(false) - настройка по умолчанию. См. https://storm.apache.org/releases/2.0.0-SNAPSHOT/Serialization.html,, в котором описано, что делает это свойство. Я не думаю, что вы хотите изменить это в вашем случае.

Добавление conf.registerSerialization(ObjectNode.class); в конфигурацию топологии должно работать, не знаю, почему это не для вас. Если вы не можете заставить его работать, вы можете обойти это, сериализовав, например, Map или String до выдачи значения.

...