Как выполнить асинхронные обратные вызовы с использованием Netty с Avro - PullRequest
3 голосов
/ 31 января 2012

Я пытаюсь реализовать асинхронные вызовы Avro, используя его реализацию NettyServer. После поиска исходного кода я нашел пример использования NettyServer из TestNettyServerWithCallbacks.java

Когда я запускаю несколько тестов, я понимаю, что NettyServer никогда не вызывает метод hello (Callback), вместо этого он продолжает вызывать синхронный метод hello (). Клиентская программа выводит «Hello», но в результате я ожидаю «Hello-ASYNC». Я действительно понятия не имею, что происходит.

Я надеюсь, что кто-то может пролить свет на меня и, возможно, указать на ошибку Ниже приведены коды, которые я использую для выполнения простого асинхронного теста avro.

AvroClient.java - код клиента.

public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1);

            System.out.println(future1.get());

            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

AvroNetty.java - Код сервера

public class AvroNetty {
    public static void main(String[] args) {
        Index indexImpl = new AsyncIndexImpl();
        Chat chatImpl = new ChatImpl();

        Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
        server.start();
        System.out.println("Server is listening at port " + server.getPort());
    }
}

ChatImpl.java

public class ChatImpl implements Chat.Callback {
    @Override
    public void hello(org.apache.avro.ipc.Callback<CharSequence> callback) throws IOException {
        callback.handleResult("Hello-ASYNC");    
    }

    @Override
    public CharSequence hello() throws AvroRemoteException {
        return new Utf8("Hello");    
    }
}

Этот интерфейс автоматически генерируется avro-tool Chat.java

@SuppressWarnings("all")
public interface Chat {
    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
    java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
    public interface Callback extends Chat {
        public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
        void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> callback) throws java.io.IOException;
    }
}

Вот схема Avro

{
  "namespace": "avro.test",
  "protocol": "Chat",

  "types" : [],

  "messages": {
      "hello": { 
                    "request": [],
                    "response": "string"
      }
  }
}

1 Ответ

1 голос
/ 03 июля 2013

Реализация NettyServer на самом деле не реализует стиль Async вообще. Это недостаток в библиотеке. Вместо этого вам нужно указать обработчик асинхронного выполнения, а не пытаться объединять службы в цепочку с помощью обратных вызовов. Вот что я использую для настройки моего NettyServer, чтобы учесть это:

ExecutorService es = Executors.newCachedThreadPool();
OrderedMemoryAwareThreadPoolExecutor executor = new OrderedMemoryAwareThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 0, 0);
ExecutionHandler executionHandler = new ExecutionHandler(executor);
final NettyServer server = new NettyServer(responder, addr, new NioServerSocketChannelFactory(es, es), executionHandler);
...