Флинк пишет в ArangoDB, происходит ArangoDBException: java.io.IOException: Достигнут конец потока - PullRequest
0 голосов
/ 24 мая 2019

Когда я использую flink для записи данных пользовательских событий в ArangoDB, происходит com.arangodb.ArangoDBException: java.io.IOException: достигнут конец потока. Но это нормально, когда я сохраняю данные в локальной среде ArangoDB, исключение - это выброс в продукт-среду, различие, которое я могу придумать, заключается в том, что локальный объем данных намного меньше, чем продукт-среда, считываемая из kafka в режиме реального времени. Поэтому я изменяю исходный код на файл hdfs, который представляет собой небольшие данные, и использую тот же логический код для записи в ArangoDB, он также выдает то же исключение, так что у кого-нибудь есть какое-то предложение?

Полный стек исключений выглядит следующим образом:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: com.arangodb.ArangoDBException: java.util.concurrent.ExecutionException: com.arangodb.ArangoDBException: java.io.IOException: Reached the end of the stream.
    at com.arangodb.internal.velocystream.internal.VstConnectionSync.write(VstConnectionSync.java:100)
    at com.arangodb.internal.velocystream.VstCommunicationSync.send(VstCommunicationSync.java:140)
    at com.arangodb.internal.velocystream.VstCommunicationSync.execute(VstCommunicationSync.java:126)
    at com.arangodb.internal.velocystream.VstCommunicationSync.execute(VstCommunicationSync.java:42)
    at com.arangodb.internal.velocystream.VstCommunication.execute(VstCommunication.java:132)
    at com.arangodb.internal.velocystream.VstProtocol.execute(VstProtocol.java:47)
    at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:71)
    at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:53)
    at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:49)
    at com.arangodb.internal.ArangoCollectionImpl.getDocument(ArangoCollectionImpl.java:134)
    at com.arangodb.internal.ArangoCollectionImpl.getDocument(ArangoCollectionImpl.java:126)
    at com.ximalaya.recsys.flink.graphdb.ArangoDBMicroBatchSink.insertEdgeDocument(ArangoDBMicroBatchSink.java:178)
    at com.ximalaya.recsys.flink.graphdb.ArangoDBMicroBatchSink.invoke(ArangoDBMicroBatchSink.java:122)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    ... 7 more
Caused by: java.util.concurrent.ExecutionException: com.arangodb.ArangoDBException: java.io.IOException: Reached the end of the stream.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:206)
    at com.arangodb.internal.velocystream.internal.VstConnectionSync.write(VstConnectionSync.java:98)
    ... 21 more
Caused by: com.arangodb.ArangoDBException: java.io.IOException: Reached the end of the stream.
    at com.arangodb.internal.velocystream.internal.MessageStore.get(MessageStore.java:73)
    at com.arangodb.internal.velocystream.internal.VstConnectionSync$1.call(VstConnectionSync.java:92)
    at com.arangodb.internal.velocystream.internal.VstConnectionSync$1.call(VstConnectionSync.java:89)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at com.arangodb.internal.velocystream.internal.MessageStore.clear(MessageStore.java:96)
    at com.arangodb.internal.velocystream.internal.VstConnection$1.call(VstConnection.java:150)
    at com.arangodb.internal.velocystream.internal.VstConnection$1.call(VstConnection.java:124)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Reached the end of the stream.
    at com.arangodb.internal.velocystream.internal.VstConnection.readBytesIntoBuffer(VstConnection.java:262)
    at com.arangodb.internal.velocystream.internal.VstConnection.readBytes(VstConnection.java:254)
    at com.arangodb.internal.velocystream.internal.VstConnection.readChunk(VstConnection.java:231)
    at com.arangodb.internal.velocystream.internal.VstConnection$1.call(VstConnection.java:141)
    ... 5 more
...