Получение и исключение NotSerializableException: com.hazelcast.map.impl.proxy.MapProxyImpl - PullRequest
1 голос
/ 05 апреля 2019

Я новичок в hazelcast-jet, и мой вариант использования заключается в чтении из источника и фильтра Kafka после проверки его значения в hazelcastIMDG.

Я получаю и загружаю карту IMDG даже до создания конвейера.См. Ниже

 IMap<String, Policy> policyMap =jet.getHazelcastInstance().getMap(POLICY_MAP_NAME);
            Utility.populatePoliciesMap(policyMap);

передача policyMap в качестве параметра в методе buildPipeline.

я создал конвейер, как показано ниже

StreamStage<TimestampedEntry<String, Long>> streamStage = pipeline.drawFrom(KafkaSources.kafka(brokerConsumerProperties(), projectionFn, getIngestTopic()))
                .addTimestamps()
                .flatMap(ingestData -> traverseArray(ingestData.getMapRequestParameterTree().toArray(new String[ingestData.getMapRequestParameterTree().size()])))
                .filter(hash -> policyMap.get(hash)!=null)
                .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(10)))
                .groupingKey(wholeItem())
                .aggregate(counting())
               .map((TimestampedEntry<String, Long> e) -> entry(e.getKey(), createBlacklistObjectEvent(Utility.fetchPolicy(e.getKey()), e.getTimestamp(), e.getValue())));
        timestampedEntryStreamStage.drainTo(Sinks.map(BL_MAP_NAME));

, но с этим я получаю исключение ниже

Исключение в потоке "main" java.lang.IllegalArgumentException: "filterFn" должен быть сериализуемым в com.hazelcast.jet.impl.util.Util.checkSerializable (Util.java:301) в com.hazelcast.jet.impl..buildPipeline (HazelcastJetIngetstResultHandler.java:120) при com.visa.rls.handler.HazelcastJetIngetstResultHandler.run (HazelcastJetIngetstResultHandler.java:84) при com.visa.rls.handler.HazelcastJetIngetstResultHandler.main (HazelcastJetIngetstResultHandler.java:58) Вызванный:java.io.NotSerializableException: com.hazelcast.map.impl.proxy.MapProxyImpl в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184) в java.io.ObjectOutputStream.writeArray (ObjectOutputStream.java:1378) в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1174) в точке java.put.Oject.Oject: 1548) в java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) в java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) в java.io.ObjectOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutput11ream.javaOject111.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) в com.hazelcast.jet.impl.util.Util.checkSerializable (Util.java:299) ... еще 5

1 Ответ

0 голосов
/ 05 апреля 2019

Вы используете policyMap внутри функции фильтра, но IMap не сериализуем. Это захватывается лямбда-выражением. Вы должны получить экземпляр IMap на каждом удаленном члене, для которого вы можете использовать filterUsingContext вместо filter:

.filterUsingContext(
    ContextFactory.withCreateFn(jetInstance -> jetInstance.getMap(POLICY_MAP_NAME)),
    (policyMap, hash) -> policyMap.get(hash) != null
)
...