Я использую Apache Kafka Streams Processor API.
Я создаю удаленный объект, который расширяет UnicastRemoteObject внутри StateStore Процессора.
Когда метод init(ProcessorContext)
объекта Процессор (который расширяет AbstractProcessor) моей топологии называется, я помещаю ссылку ProcessorContext внутри этого удаленного объекта.
public void init(ProcessorContext processorContext){
super.init(processorContext);
this.myStore = (MyCustomStore<String, String>)context().getStateStore("mystore");
this.myStore.getRemoteObject().setProcessorContext(processorContext);
}
Если я получаю доступ к ProcessorContext только удаленно и использую forward
Метод отправки записей на процессор приемника работает нормально. И если я получаю доступ к ProcessorContext только локально для отправки записей с forward
, он тоже работает нормально.
Если я получаю доступ к ProcessorContext как локально, так и удаленно для использования forward
, у меня есть NPE , Я использовал синхронизированный метод, в котором вызывается foward
, потому что я думал, что два разных потока пытаются получить доступ к ProcessorContext одновременно, но NPE настаивает.
Мне кажется, что каким-то образом один и тот же поток вызывает вперед одновременно и локально, и удаленно, и это вызывает NPE.
Сообщение:
java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:114)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at StateMessageBean.sendMessage(StateMessageBean.java:109)
at StateMessageBean.subroundEnded(StateMessageBean.java:87)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:361)
at sun.rmi.transport.Transport$1.run(Transport.java:200)
at sun.rmi.transport.Transport$1.run(Transport.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:283)
at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:260)
at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:161)
at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:227)
at java.rmi.server.RemoteObjectInvocationHandler.invoke(RemoteObjectInvocationHandler.java:179)
at com.sun.proxy.$Proxy25.subroundEnded(Unknown Source)
at CoordinatorProcessor.collectIncreaseOfC(CoordinatorProcessor.java:332)
at CoordinatorProcessor.process(CoordinatorProcessor.java:119)
at CoordinatorProcessor.process(CoordinatorProcessor.java:25)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Два метода, которые я вызываю удаленно и локально:
//remote method
@Override
public void subroundEnded()
{
PhiMessage newPhiMessage = new PhiMessage();
newPhiMessage.setSiteId(this.siteId);
newPhiMessage.setCurrentRound(this.currentRound);
newPhiMessage.setCurrentSubround(this.currentSubround);
newPhiMessage.setPhi(this.phi);
byte[] byteNewPhiMessage = SerializationUtils.serialize(newPhiMessage);
this.processorContext.forward("PhiMessage", byteNewPhiMessage);
}
//local method
public void sendIncrease()
{
IncreaseMessage increaseMessage = new IncreaseMessage();
increaseMessage.setCurrentRound(this.currentRound);
increaseMessage.setCurrentSubround(this.currentSubround);
increaseMessage.setIncrease(this.increase);
byte[] byteIncreaseMessage = SerializationUtils.serialize(increaseMessage);
this.processorContext.forward("IncreaseMessage", byteIncreaseMessage);
}