Удаленный вызов forward () ProcessorContext вызывает NPE - PullRequest
0 голосов
/ 06 января 2020

Я использую Apache Kafka Streams Processor API.

Я создаю удаленный объект, который расширяет UnicastRemoteObject внутри StateStore Процессора.

Когда метод init(ProcessorContext) объекта Процессор (который расширяет AbstractProcessor) моей топологии называется, я помещаю ссылку ProcessorContext внутри этого удаленного объекта.

public void init(ProcessorContext processorContext){
  super.init(processorContext);
  this.myStore = (MyCustomStore<String, String>)context().getStateStore("mystore");
  this.myStore.getRemoteObject().setProcessorContext(processorContext);
}

Если я получаю доступ к ProcessorContext только удаленно и использую forward Метод отправки записей на процессор приемника работает нормально. И если я получаю доступ к ProcessorContext только локально для отправки записей с forward, он тоже работает нормально.

Если я получаю доступ к ProcessorContext как локально, так и удаленно для использования forward, у меня есть NPE , Я использовал синхронизированный метод, в котором вызывается foward, потому что я думал, что два разных потока пытаются получить доступ к ProcessorContext одновременно, но NPE настаивает.

Мне кажется, что каким-то образом один и тот же поток вызывает вперед одновременно и локально, и удаленно, и это вызывает NPE.

Сообщение:

java.lang.NullPointerException
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:114)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
        at StateMessageBean.sendMessage(StateMessageBean.java:109)
        at StateMessageBean.subroundEnded(StateMessageBean.java:87)
        at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:361)
        at sun.rmi.transport.Transport$1.run(Transport.java:200)
        at sun.rmi.transport.Transport$1.run(Transport.java:197)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:283)
        at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:260)
        at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:161)
        at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:227)
        at java.rmi.server.RemoteObjectInvocationHandler.invoke(RemoteObjectInvocationHandler.java:179)
        at com.sun.proxy.$Proxy25.subroundEnded(Unknown Source)
        at CoordinatorProcessor.collectIncreaseOfC(CoordinatorProcessor.java:332)
        at CoordinatorProcessor.process(CoordinatorProcessor.java:119)
        at CoordinatorProcessor.process(CoordinatorProcessor.java:25)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

Два метода, которые я вызываю удаленно и локально:

//remote method
@Override
public void subroundEnded()
{
    PhiMessage newPhiMessage = new PhiMessage();
    newPhiMessage.setSiteId(this.siteId);
    newPhiMessage.setCurrentRound(this.currentRound);
    newPhiMessage.setCurrentSubround(this.currentSubround);
    newPhiMessage.setPhi(this.phi);

    byte[] byteNewPhiMessage = SerializationUtils.serialize(newPhiMessage);
    this.processorContext.forward("PhiMessage",  byteNewPhiMessage);
}

//local method
public  void sendIncrease()
{

  IncreaseMessage increaseMessage = new IncreaseMessage();
  increaseMessage.setCurrentRound(this.currentRound);
  increaseMessage.setCurrentSubround(this.currentSubround);
  increaseMessage.setIncrease(this.increase);
  byte[] byteIncreaseMessage = SerializationUtils.serialize(increaseMessage);
  this.processorContext.forward("IncreaseMessage",  byteIncreaseMessage);

}
...