RpcDispatcher # callRemoteMethods не работает при вызове MembershipListener # viewAccepted при использовании ForkChannel - PullRequest
0 голосов
/ 17 июня 2019

RpcDispatcher # callRemoteMethods не работает при вызове MembershipListener # viewAccepted при использовании ForkChannel.

Я пытаюсь обновить версию JGroups, используемую в моем приложении, с 2 до 4.1.0. Наконец. Приложение использует MuxRpcDispatcher, потому что оно использует несколько RpcDispatcher. Пакет mux устарел в версии 4, и мы пытаемся использовать альтернативный fork-стек. Однако при выполнении RpcDispatcher в viewAccepted обработка останавливается.

RpcDispatcher # callRemoteMethods не работает при вызове MembershipListener # viewAccepted при использовании ForkChannel.

channel = new JChannel();
channel.setReceiver(this);

if (channel.getProtocolStack().findProtocol(FORK.class) == null) {
    channel.getProtocolStack().addProtocol(new FORK());
}

forkChannel = new ForkChannel(channel, "fork", "fork");
dispatcher1 = new RpcDispatcher(forkChannel, new Boe1());

channel.connect("test");
forkChannel.connect("test");

Вызов RpcDispatcher в viewAccepted. Обработка останавливается на этом вызове.

@Override
public void viewAccepted(final View new_view) {
    LOGGER.info("viewAccepted:start");
    try {
        final MethodCall call = new MethodCall(Boe1.class.getMethod("boeee"));
        final RequestOptions options = new RequestOptions(ResponseMode.GET_ALL, 0, true, null);
        dispatcher1.callRemoteMethods(null, call, options);
    } catch (final Exception e) {
        e.printStackTrace();
    }
    LOGGER.info("viewAccepted:end");
}

Ниже приведен дамп потока в остановленном состоянии.

"jgroups-10,test,IM9072-10017" #22 prio=5 os_prio=0 tid=0x000000002b4b5800 nid=0x2cf8 waiting on condition [0x000000002c84d000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000071b24ccd0> (a java.util.concurrent.CompletableFuture$Signaller)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at org.jgroups.blocks.GroupRequest.access$7(GroupRequest.java:1)
    at org.jgroups.blocks.GroupRequest$$Lambda$135/1688803174.call(Unknown Source)
    at org.jgroups.blocks.GroupRequest.doAndComplete(GroupRequest.java:274)
    at org.jgroups.blocks.GroupRequest.waitForCompletion(GroupRequest.java:254)
    at org.jgroups.blocks.GroupRequest.waitForCompletion(GroupRequest.java:1)
    at org.jgroups.blocks.Request.execute(Request.java:52)
    at org.jgroups.blocks.MessageDispatcher.cast(MessageDispatcher.java:319)
    at org.jgroups.blocks.MessageDispatcher.castMessage(MessageDispatcher.java:251)
    at org.jgroups.blocks.RpcDispatcher.callRemoteMethods(RpcDispatcher.java:96)
    at ppp.network.JChannelRunner.viewAccepted(JChannelRunner.java:79)
    at org.jgroups.JChannel.invokeCallback(JChannel.java:917)
    at org.jgroups.JChannel.up(JChannel.java:759)
    at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:908)
    at org.jgroups.protocols.FORK.up(FORK.java:131)

Есть ли способ избежать зависаний?

Ответы [ 2 ]

0 голосов
/ 24 июня 2019

Ошибка, которую вы сделали, заключается в том, что вы зарегистрировали прослушиватель представления в JChannel, но не в ForkChannel.

Это означает, что обратный вызов viewAccepted() был вызван, когда у ForkChannel еще не было представления, поэтому вызов метода вернется немедленно, так как в этот момент не было зарегистрировано никакого членства.

Я прокомментировал эту строку и добавил dispatcher1.setMembershipListener (), и теперь пример кода работает. Cheers,

public class JChannelRunner extends ReceiverAdapter implements Closeable {
    public static class Boe1 {
        public void boeee() {System.out.println("boe-1");}
    }

    private final static Logger LOGGER = Logger.getLogger(JChannelRunner.class.getName());

    JChannel channel;
    ForkChannel forkChannel;
    RpcDispatcher dispatcher1;

    protected void start() throws Exception {
        channel = new JChannel();
        // channel.setReceiver(this);

        if (channel.getProtocolStack().findProtocol(FORK.class) == null) {
            channel.getProtocolStack().addProtocol(new FORK());
        }

        forkChannel = new ForkChannel(channel, "fork", "fork");
        dispatcher1 = new RpcDispatcher(forkChannel, new Boe1());
        dispatcher1.setMembershipListener(this);
        channel.connect("test");
        forkChannel.connect("test");
    }

    public JChannelRunner() throws Exception {}

    public static void main(final String[] args) throws IOException, Exception {
        try (JChannelRunner runner = new JChannelRunner()) {
            runner.start();
            Thread.sleep(60 * 1000);
        }
    }

    @Override
    public void close() throws IOException {
        Util.close(dispatcher1, forkChannel, channel);
    }

    @Override
    public void viewAccepted(final View new_view) {
        LOGGER.info("viewAccepted:start");
        try {
            final MethodCall call = new MethodCall(Boe1.class.getMethod("boeee"));
            final RequestOptions options = new RequestOptions(ResponseMode.GET_ALL, 0, true, null);
            dispatcher1.callRemoteMethods(null, call, options);
        } catch (final Exception e) {
            e.printStackTrace();
        }
        LOGGER.info("viewAccepted:end");
    }
}
0 голосов
/ 18 июня 2019

Вы должны никогда не блокировать в обратном вызове, например viewAccepted()!Если у вас абсолютно есть для вызова RPC, либо вызовите его асинхронно (mode = GET_NONE), либо внеполосно (OOB).Вы также можете сделать это в отдельном потоке.

Подробнее см. [1].Приветствия,

[1] http://www.jgroups.org/manual4/index.html#ReceiverAdapter

...