Я пытаюсь использовать neo4j, используя драйвер python. Реализована программа, которая часто обменивается данными между neo4j и Python, и каждая из итераций программы независима. Программа работает отлично, когда не используется параллельная обработка. Затем я пытаюсь реализовать это с помощью параллельной обработки в python, где я распараллеливаю эти независимые итерации. У меня есть 24 ядра машины. Так что я могу запустить довольно много процессов. Даже при параллельном выполнении программа выполняется до тех пор, пока число процессов не станет равным 5. Для любого числа, превышающего 90%, я получаю следующую ошибку.
---------------------------------------------------------------------------
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 828, in close
self.sync()
File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 793, in sync
self.session.sync()
File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 538, in sync
detail_count, _ = self._connection.sync()
File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 526, in sync
self.send()
File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 388, in send
self._send()
File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 408, in _send
self.socket.sendall(data)
File "/usr/local/lib/python3.7/ssl.py", line 1015, in sendall
v = self.send(byte_view[count:])
File "/usr/local/lib/python3.7/ssl.py", line 984, in send
return self._sslobj.write(data)
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "<ipython-input-4-616f793afd51>", line 9, in func
d2=run_query(streaming_query)
File "<ipython-input-2-01a2f4205218>", line 6, in run_query
result = session.read_transaction(lambda tx: tx.run(streaming_query))
File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 710, in read_transaction
return self._run_transaction(READ_ACCESS, unit_of_work, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 686, in _run_transaction
tx.close()
File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 835, in close
self.session.commit_transaction()
File "/usr/local/lib/python3.7/site-packages/neo4j/__init__.py", line 630, in commit_transaction
self._connection.sync()
File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 526, in sync
self.send()
File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 388, in send
self._send()
File "/usr/local/lib/python3.7/site-packages/neobolt/direct.py", line 408, in _send
self.socket.sendall(data)
File "/usr/local/lib/python3.7/ssl.py", line 1015, in sendall
v = self.send(byte_view[count:])
File "/usr/local/lib/python3.7/ssl.py", line 984, in send
return self._sslobj.write(data)
BrokenPipeError: [Errno 32] Broken pipe
"""
The above exception was the direct cause of the following exception:
BrokenPipeError Traceback (most recent call last)
<ipython-input-5-da15b33c8ad4> in <module>
7 pool = multiprocessing.Pool(processes=num_processes)
8 start = time.time()
----> 9 result = pool.map(func, chunks)
10 end = time.time()
11 print(end-start)
/usr/local/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
288 in a list that is returned.
289 '''
--> 290 return self._map_async(func, iterable, mapstar, chunksize).get()
291
292 def starmap(self, func, iterable, chunksize=None):
/usr/local/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
681 return self._value
682 else:
--> 683 raise self._value
684
685 def _set(self, i, obj):
BrokenPipeError: [Errno 32] Broken pipe
Кроме того, я получаю следующие предупреждения
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to read from defunct connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687))
Failed to write data to connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687)); ("32; 'Broken pipe'")
Transaction failed and will be retried in 1.1551515321361832s (Failed to write to closed connection Address(host='127.0.0.1', port=7687) (Address(host='127.0.0.1', port=7687)))
Журнал отладки сервера neo4j выглядит следующим образом:
2020-04-09 13:07:26.033+0000 INFO [o.n.l.i.StoreLogService] Rotated internal log file
2020-04-09 13:08:16.724+0000 ERROR [o.n.b.t.p.HouseKeeper] Fatal error occurred when handling a client connection: [id: 0xdb5b2521, L:/127.0.0.1:7687 ! R:/127.0.0.1:58086] javax.net.ssl.SSLException: bad record MAC
io.netty.handler.codec.DecoderException: javax.net.ssl.SSLException: bad record MAC
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:472)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.net.ssl.SSLException: bad record MAC
at sun.security.ssl.Alerts.getSSLException(Alerts.java:208)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1709)
at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:970)
at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:896)
at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:766)
at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
at io.netty.handler.ssl.SslHandler$SslEngineType$3.unwrap(SslHandler.java:295)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1301)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1203)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1247)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
... 17 more
Caused by: javax.crypto.BadPaddingException: bad record MAC
at sun.security.ssl.EngineInputRecord.decrypt(EngineInputRecord.java:238)
at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:963)
... 26 more
Несколько моментов, которые я хотел бы упомянуть
- Попытка использовать один драйвер для Вся программа и настройка сеанса по мере необходимости
- Я пытался использовать драйвер для каждого процесса, но проблема все еще не решена.
- Звучит странно, но я пытался настроить драйвер как и когда требуется вызов БД, и он закрывается сразу после извлечения данных. Здесь я не сталкиваюсь с ошибкой прерванной трубы, но после превышения лимита Соединение отклоняется.
Я хочу знать, каков идеальный способ установки драйвера. Также, пожалуйста, помогите мне в решении этих проблем.
Спасибо