Не удалось распаковать данные обнаружения для компонента: CONTINUOUS_PRO C с более чем одним прослушивателем ContinuousQuery в Ignite - PullRequest
0 голосов
/ 18 апреля 2020

Я пытаюсь поиграть с Ignite (v2.8.0) и ContinuousQuery-es на моей локальной машине. Моя установка состоит из одного узла сервера и двух клиентов.

Проблема заключается в том, что после запуска узла сервера и подключения первого клиента и прослушивания обновлений с помощью ContinuousQuery происходит сбой любого другого последующего клиента со следующим исключением:

[ERROR] 2020-04-18 20:00:18.436 [tcp-client-disco-msg-worker-#4] TcpDiscoverySpi - Failed to unmarshal discovery data for component: CONTINUOUS_PROC

Я также замечаю, что он продолжает давать сбой, пока курсор запроса не будет закрыт в первом клиенте (после закрытия любой другой один клиент может подключиться).

Код узла сервера:

public class IgniteServerCacheBootstrap {

    final static Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class);

    public static void main(String[] args) throws IgniteCheckedException, InterruptedException {

        IgniteConfiguration serverConfig = new IgniteConfiguration()
                .setGridLogger(new Log4J2Logger("log4j2.xml"));

        Ignite server = Ignition.start(serverConfig);
        Thread.currentThread().join();
    }

}

Код клиентского узла (в основном из примеров Ignite) Я пытаюсь запустить два таких узла параллельно:

public class IgniteCacheClient implements Serializable {

    Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class);

    private IgniteCache igniteCache;

    public IgniteCacheClient() throws IgniteCheckedException {
        IgniteConfiguration clientConfig = new IgniteConfiguration()
                .setGridLogger(new Log4J2Logger("log4j2.xml"))
                .setClientMode(true);

        Ignite client = Ignition.getOrStart(clientConfig);
        igniteCache = client.getOrCreateCache("MY_CACHE");
    }

    public void run() throws InterruptedException {

        // Create new continuous query.
        ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

        qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
            @Override
            public boolean apply(Integer key, String val) {
                return key > 10;
            }
        }));

        // Callback that is called locally when update notifications are received.
        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
                for (CacheEntryEvent<? extends Integer, ? extends String> e : evts)
                    logger.info("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
            }
        });

        // This filter will be evaluated remotely on all nodes.
        // Entry that pass this filter will be sent to the caller.
        qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
            @Override
            public CacheEntryEventFilter<Integer, String> create() {
                return new CacheEntryEventFilter<Integer, String>() {
                    @Override
                    public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
                        return e.getKey() > 10;
                    }
                };
            }
        });

        // Execute query.
        QueryCursor<Cache.Entry<Integer, String>> cur = igniteCache.query(qry);

        // Iterate through existing data.
        for (Cache.Entry<Integer, String> e : cur)
            logger.info("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');

        Thread.currentThread().join();
    }
}

Полное исключение

[ERROR] 2020-04-18 20:00:18.436 [tcp-client-disco-msg-worker-#4] TcpDiscoverySpi - Failed to unmarshal discovery data for component: CONTINUOUS_PROC
org.apache.ignite.IgniteCheckedException: Failed to deserialize object with given class loader: sun.misc.Launcher$AppClassLoader@18b4aac2
    at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:132) ~[ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:93) ~[ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.util.IgniteUtils.unmarshalZip(IgniteUtils.java:10248) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket.unmarshalData(DiscoveryDataPacket.java:340) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket.unmarshalGridData(DiscoveryDataPacket.java:155) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.onExchange(TcpDiscoverySpi.java:2069) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.processNodeAddFinishedMessage(ClientImpl.java:2219) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.processDiscoveryMessage(ClientImpl.java:2088) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.body(ClientImpl.java:1930) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$1.body(ClientImpl.java:302) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:61) [ignite-core-2.8.0.jar:2.8.0]
Caused by: java.io.InvalidObjectException: Failed to find cache for name: MY_CACHE
    at org.apache.ignite.internal.processors.cache.GridCacheContext.readResolve(GridCacheContext.java:2376) ~[ignite-core-2.8.0.jar:2.8.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.readExternal(IgniteCacheProxyImpl.java:2192) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.readExternal(GatewayProtectedCacheProxy.java:1706) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2.readExternal(CacheContinuousQueryHandlerV2.java:179) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at java.util.HashMap.readObject(HashMap.java:1409) ~[?:1.8.0_172]
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) ~[?:?]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.util.IgniteUtils.readMap(IgniteUtils.java:5409) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData.readExternal(GridContinuousProcessor.java:2446) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:124) ~[ignite-core-2.8.0.jar:2.8.0]
    ... 11 more
Caused by: java.lang.IllegalStateException: Failed to find cache for name: MY_CACHE
    at org.apache.ignite.internal.processors.cache.GridCacheContext.readResolve(GridCacheContext.java:2371) ~[ignite-core-2.8.0.jar:2.8.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.readExternal(IgniteCacheProxyImpl.java:2192) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.readExternal(GatewayProtectedCacheProxy.java:1706) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2.readExternal(CacheContinuousQueryHandlerV2.java:179) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at java.util.HashMap.readObject(HashMap.java:1409) ~[?:1.8.0_172]
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) ~[?:?]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.util.IgniteUtils.readMap(IgniteUtils.java:5409) ~[ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData.readExternal(GridContinuousProcessor.java:2446) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:124) ~[ignite-core-2.8.0.jar:2.8.0]
    ... 11 more

1 Ответ

0 голосов
/ 20 апреля 2020

"Не удалось найти кэш для имени: MY_CACHE"

Я думаю, что ваш private IgniteCache igniteCache передается с вашими лямбдами. Пожалуйста, попробуйте преобразовать все ваши анонимные внутренние классы в именованные внутренние классы stati c. Я говорю о предикатах, слушателях и фабриках.

...