Сначала я объясню ситуацию и логику, которую я пытаюсь реализовать:
У меня есть несколько потоков, каждый из которых помещает результат своей работы, некоторый объект с именем Result
в очередь QueueToSend
My NettyClient
запускается в потоке и принимает Result
от QueueToSend
каждую 1 миллисекунду и должен подключиться к серверу и отправить сообщение, которое создается из Result
.
Мне также нужно, чтобы эти соединения были асинхронными. Поэтому мне нужно, чтобы список Result
был известен NettyHandler
, чтобы отправить правильное сообщение, обработать правильный результат и затем снова отправить ответ.
Итак, я инициализирую NettyClient
bootstrap
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
и устанавливает конвейер один раз при запуске приложения.
Затем каждую миллисекунду я беру Result
объект из QueueToSend
и подключаюсь к серверу
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,port);
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
Я решил использовать статический ConcurrentHashMap
для сохранения каждого объекта результата, взятого из QueueToSend
, связанного с каналом.
Первая проблема возникает в NettyHandler
в методе channelConnected, когда я пытаюсь получить Result
объект, связанный с каналом, из ResultConcurrentHashMap
.
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel channel = ctx.getPipeline.getChannel();
Result result = ResultConcurrentHashMap.get(channel.getId());
}
Но иногда result
равно нулю (1 из 50), даже если бы оно было в ResultConcurrentHashMap
. Я думаю, это происходит потому, что событие channelConnected
происходит до того, как NettyClient
запускает этот код:
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
Может быть, он не появится, если я запускаю NettyServer
и NettyClient
не на локальном хосте, но удаленно, для установления соединения потребуется больше времени. Но мне нужно решение этой проблемы.
Другая проблема заключается в том, что я отправляю сообщения каждую 1 миллисекунду асинхронно и полагаю, что сообщения могут смешиваться, и сервер не может их правильно прочитать. Если я буду запускать их один за другим, все будет в порядке:
future.getChannel().getCloseFuture().awaitUninterruptibly();
Но мне нужна асинхронная отправка и обработка правильных результатов, связанных с каналом и отправкой ответов.
Что я должен реализовать?