Я пытаюсь настроить простое доказательство концепции Java <-> # C / .NET с использованием Apache Geode, в частности, проверяя функциональность непрерывного запроса с помощью собственного клиента .NET.Использование обычного запроса отлично работает в .NET, проблема возникает только у непрерывного запроса.Я сталкиваюсь с проблемой, когда вызываю метод Execute () для объекта непрерывного запроса.Я получаю конкретную ошибку:
При обработке ответа получен необработанный тип сообщения 26, возможное несоответствие сериализации
Я храню только простые строки в области кэша, поэтому я 'Я немного удивлен, что у меня проблемы с сериализацией.Я попытался включить сериализацию PDX с обеих сторон (и работать без нее), похоже, это не имеет значения.Любые идеи?
Вот мой код для обеих сторон:
Java
Запускает сервер, помещает некоторые данные, а затем продолжает обновлять данныезапись в кэше.
public class GeodePoc {
public static void main(String[] args) throws Exception {
ServerLauncher serverLauncher = new ServerLauncher.Builder().setMemberName("server1")
.setServerBindAddress("localhost").setServerPort(10334).set("start-locator", "localhost[20341]")
.set(ConfigurationProperties.LOG_LEVEL, "trace")
.setPdxReadSerialized(true)
.set(ConfigurationProperties.CACHE_XML_FILE, "cache.xml").build();
serverLauncher.start();
Cache c = CacheFactory.getAnyInstance();
Region<String, String> r = c.getRegion("example_region");
r.put("test1", "value1");
r.put("test2", "value2");
System.out.println("Cache server successfully started");
int i = 0;
while (true) {
r.put("test1", "value" + i);
System.out.println(r.get("test1"));
Thread.sleep(3000);
i++;
}
}
}
Server cache.xml
<?xml version="1.0" encoding="UTF-8"?>
<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
version="1.0">
<cache-server bind-address="localhost" port="40404"
max-connections="100" />
<pdx>
<pdx-serializer>
<class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name>
<parameter name="classes">
<string>java.lang.String</string>
</parameter>
</pdx-serializer>
</pdx>
<region name="example_region">
<region-attributes refid="REPLICATE" />
</region>
</cache>
.NET Client
public static void GeodeTest()
{
Properties<string, string> props = Properties<string, string>.Create();
props.Insert("cache-xml-file", "<path-to-cache.xml>");
CacheFactory cacheFactory = new CacheFactory(props)
.SetPdxReadSerialized(true).SetPdxIgnoreUnreadFields(true)
.Set("log-level", "info");
Cache cache = cacheFactory.Create();
cache.TypeRegistry.PdxSerializer = new ReflectionBasedAutoSerializer();
IRegion<string, string> region = cache.GetRegion<string, string>("example_region");
Console.WriteLine(region.Get("test2", null));
PoolManager pManager = cache.GetPoolManager();
Pool pool = pManager.Find("serverPool");
QueryService qs = pool.GetQueryService();
// Regular query example (works)
Query<string> q = qs.NewQuery<string>("select * from /example_region");
ISelectResults<string> results = q.Execute();
Console.WriteLine("Finished query");
foreach (string result in results)
{
Console.WriteLine(result);
}
// Continuous Query (does not work)
CqAttributesFactory<string, object> cqAttribsFactory = new CqAttributesFactory<string, object>();
ICqListener<string, object> listener = new CacheListener<string, object>();
cqAttribsFactory.InitCqListeners(new ICqListener<string, object>[] { listener });
cqAttribsFactory.AddCqListener(listener);
CqAttributes<string, object> cqAttribs = cqAttribsFactory.Create();
CqQuery<string, object> cquery = qs.NewCq<string, object>("select * from /example_region", cqAttribs, false);
Console.WriteLine(cquery.GetState());
Console.WriteLine(cquery.QueryString);
Console.WriteLine(">>> Cache query example started.");
cquery.Execute();
Console.WriteLine();
Console.WriteLine(">>> Example finished, press any key to exit ...");
Console.ReadKey();
}
.NET Cache Listener
public class CacheListener<TKey, TResult> : ICqListener<TKey, TResult>
{
public virtual void OnEvent(CqEvent<TKey, TResult> ev)
{
object val = ev.getNewValue() as object;
TKey key = ev.getKey();
CqOperation opType = ev.getQueryOperation();
string opStr = "DESTROY";
if (opType == CqOperation.OP_TYPE_CREATE)
opStr = "CREATE";
else if (opType == CqOperation.OP_TYPE_UPDATE)
opStr = "UPDATE";
Console.WriteLine("MyCqListener::OnEvent called with key {0}, op {1}.", key, opStr);
}
public virtual void OnError(CqEvent<TKey, TResult> ev)
{
Console.WriteLine("MyCqListener::OnError called");
}
public virtual void Close()
{
Console.WriteLine("MyCqListener::close called");
}
}
.NET Client cache.xml
<client-cache
xmlns="http://geode.apache.org/schema/cache"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
version="1.0">
<pool name="serverPool" subscription-enabled="true">
<locator host="localhost" port="20341"/>
</pool>
<region name="example_region">
<region-attributes refid="CACHING_PROXY" pool-name="serverPool" />
</region>
</client-cache>