Собственный клиент - исключение сериализации при выполнении непрерывного запроса - PullRequest
0 голосов
/ 31 мая 2018

Я пытаюсь настроить простое доказательство концепции Java <-> # C / .NET с использованием Apache Geode, в частности, проверяя функциональность непрерывного запроса с помощью собственного клиента .NET.Использование обычного запроса отлично работает в .NET, проблема возникает только у непрерывного запроса.Я сталкиваюсь с проблемой, когда вызываю метод Execute () для объекта непрерывного запроса.Я получаю конкретную ошибку:

При обработке ответа получен необработанный тип сообщения 26, возможное несоответствие сериализации

Я храню только простые строки в области кэша, поэтому я 'Я немного удивлен, что у меня проблемы с сериализацией.Я попытался включить сериализацию PDX с обеих сторон (и работать без нее), похоже, это не имеет значения.Любые идеи?

Вот мой код для обеих сторон:

Java

Запускает сервер, помещает некоторые данные, а затем продолжает обновлять данныезапись в кэше.

public class GeodePoc {

    public static void main(String[] args) throws Exception {

        ServerLauncher serverLauncher = new ServerLauncher.Builder().setMemberName("server1")
                .setServerBindAddress("localhost").setServerPort(10334).set("start-locator", "localhost[20341]")
                .set(ConfigurationProperties.LOG_LEVEL, "trace")
                .setPdxReadSerialized(true)
                .set(ConfigurationProperties.CACHE_XML_FILE, "cache.xml").build();

        serverLauncher.start();
        Cache c = CacheFactory.getAnyInstance();
        Region<String, String> r = c.getRegion("example_region");
        r.put("test1", "value1");
        r.put("test2", "value2");

        System.out.println("Cache server successfully started");

        int i = 0;
        while (true) {
            r.put("test1", "value" + i);
            System.out.println(r.get("test1"));
            Thread.sleep(3000);
            i++;
        }

    }

}

Server cache.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
    version="1.0">

    <cache-server bind-address="localhost" port="40404"
        max-connections="100" />
    <pdx>
        <pdx-serializer>
            <class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name>
            <parameter name="classes">
                <string>java.lang.String</string>
            </parameter>
        </pdx-serializer>
    </pdx>

    <region name="example_region">
        <region-attributes refid="REPLICATE"  />
    </region>


</cache>

.NET Client

public static void GeodeTest()
        {
            Properties<string, string> props = Properties<string, string>.Create();
            props.Insert("cache-xml-file", "<path-to-cache.xml>");

            CacheFactory cacheFactory = new CacheFactory(props)
                .SetPdxReadSerialized(true).SetPdxIgnoreUnreadFields(true)
                .Set("log-level", "info");
            Cache cache = cacheFactory.Create();
            cache.TypeRegistry.PdxSerializer = new ReflectionBasedAutoSerializer();

            IRegion<string, string> region = cache.GetRegion<string, string>("example_region");

            Console.WriteLine(region.Get("test2", null));
            PoolManager pManager = cache.GetPoolManager();
            Pool pool = pManager.Find("serverPool");

            QueryService qs = pool.GetQueryService();

            // Regular query example (works)
            Query<string> q = qs.NewQuery<string>("select * from /example_region");
            ISelectResults<string> results = q.Execute();
            Console.WriteLine("Finished query");
            foreach (string result in results)
            {
                Console.WriteLine(result);
            }

            // Continuous Query (does not work)
            CqAttributesFactory<string, object> cqAttribsFactory = new CqAttributesFactory<string, object>();
            ICqListener<string, object> listener = new CacheListener<string, object>();
            cqAttribsFactory.InitCqListeners(new ICqListener<string, object>[] { listener });
            cqAttribsFactory.AddCqListener(listener);
            CqAttributes<string, object> cqAttribs = cqAttribsFactory.Create();

            CqQuery<string, object> cquery = qs.NewCq<string, object>("select * from /example_region", cqAttribs, false);
            Console.WriteLine(cquery.GetState());
            Console.WriteLine(cquery.QueryString);

            Console.WriteLine(">>> Cache query example started.");
            cquery.Execute();

            Console.WriteLine();
            Console.WriteLine(">>> Example finished, press any key to exit ...");
            Console.ReadKey();
        }

.NET Cache Listener

public class CacheListener<TKey, TResult> : ICqListener<TKey, TResult>
    {
        public virtual void OnEvent(CqEvent<TKey, TResult> ev)
        {
            object val = ev.getNewValue() as object;
            TKey key = ev.getKey();
            CqOperation opType = ev.getQueryOperation();
            string opStr = "DESTROY";
            if (opType == CqOperation.OP_TYPE_CREATE)
                opStr = "CREATE";
            else if (opType == CqOperation.OP_TYPE_UPDATE)
                opStr = "UPDATE";
            Console.WriteLine("MyCqListener::OnEvent called with key {0}, op {1}.", key, opStr);
        }
        public virtual void OnError(CqEvent<TKey, TResult> ev)
        {
            Console.WriteLine("MyCqListener::OnError called");
        }
        public virtual void Close()
        {
            Console.WriteLine("MyCqListener::close called");
        }
    }

.NET Client cache.xml

<client-cache
    xmlns="http://geode.apache.org/schema/cache"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
    version="1.0">

  <pool name="serverPool" subscription-enabled="true">
    <locator host="localhost" port="20341"/>
  </pool>

  <region name="example_region">
    <region-attributes refid="CACHING_PROXY" pool-name="serverPool" />
  </region>
</client-cache>

1 Ответ

0 голосов
/ 04 июня 2018

Это оказалось просто упущением с моей стороны.Чтобы непрерывный запрос работал, вы должны включить зависимость geode-cq на стороне Java.Я этого не делал, и это вызвало исключение.

...