Я новичок в hazelcast-jet, и мой вариант использования заключается в чтении из источника и фильтра Kafka после проверки его значения в hazelcastIMDG.
Я получаю и загружаю карту IMDG даже до создания конвейера.См. Ниже
IMap<String, Policy> policyMap =jet.getHazelcastInstance().getMap(POLICY_MAP_NAME);
Utility.populatePoliciesMap(policyMap);
передача policyMap в качестве параметра в методе buildPipeline.
я создал конвейер, как показано ниже
StreamStage<TimestampedEntry<String, Long>> streamStage = pipeline.drawFrom(KafkaSources.kafka(brokerConsumerProperties(), projectionFn, getIngestTopic()))
.addTimestamps()
.flatMap(ingestData -> traverseArray(ingestData.getMapRequestParameterTree().toArray(new String[ingestData.getMapRequestParameterTree().size()])))
.filter(hash -> policyMap.get(hash)!=null)
.window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(10)))
.groupingKey(wholeItem())
.aggregate(counting())
.map((TimestampedEntry<String, Long> e) -> entry(e.getKey(), createBlacklistObjectEvent(Utility.fetchPolicy(e.getKey()), e.getTimestamp(), e.getValue())));
timestampedEntryStreamStage.drainTo(Sinks.map(BL_MAP_NAME));
, но с этим я получаю исключение ниже
Исключение в потоке "main" java.lang.IllegalArgumentException: "filterFn" должен быть сериализуемым в com.hazelcast.jet.impl.util.Util.checkSerializable (Util.java:301) в com.hazelcast.jet.impl..buildPipeline (HazelcastJetIngetstResultHandler.java:120) при com.visa.rls.handler.HazelcastJetIngetstResultHandler.run (HazelcastJetIngetstResultHandler.java:84) при com.visa.rls.handler.HazelcastJetIngetstResultHandler.main (HazelcastJetIngetstResultHandler.java:58) Вызванный:java.io.NotSerializableException: com.hazelcast.map.impl.proxy.MapProxyImpl в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184) в java.io.ObjectOutputStream.writeArray (ObjectOutputStream.java:1378) в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1174) в точке java.put.Oject.Oject: 1548) в java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) в java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) в java.io.ObjectOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutputStream.wavaOutput11ream.javaOject111.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) в com.hazelcast.jet.impl.util.Util.checkSerializable (Util.java:299) ... еще 5