Хорошо, я понял - используя mmap с тегом:
from multiprocessing import Process,Queue
from ctypes import c_char,addressof
import pyarrow as pa
import numpy as np
from time import sleep
import os
import datetime
import mmap
from sys import getsizeof
class ProcessIn(Process):
def __init__(self,QueueI):
super().__init__(daemon=True)
self.QueueI=QueueI
def run(self):
i=0
while True:
N=np.random.randint(10,14)*100000
data = b'abcdefghijklmnopqrstuvwxyz'
Tag='Data_'+str(i)
buf = mmap.mmap(-1, N*len(data),tagname=Tag)
buf[0]=i
NN=N*len(data)
# print(buf[0:10])
print(os.getpid(),'putted',Tag,NN)
if self.QueueI.qsize()==0:
self.QueueI.put((Tag,NN,datetime.datetime.now()))
i+=1
sleep(1)
class ProcessOut(Process):
def __init__(self,QueueI):
super().__init__(daemon=True)
self.QueueI=QueueI
def run(self):
while True:
# print(os.getpid(),'step 1')
Tag,N,start=self.QueueI.get()
buf = mmap.mmap(-1, N,tagname=Tag)
print('got',buf[0:10],Tag)
# data=buf.read()
dt=(datetime.datetime.now()-start).total_seconds()
if dt!=0:
print(os.getpid(),N/dt/1024**2,'MBs',dt*1000,'ms',N/1024**2,'MB',N)
else:
print(os.getpid(),np.nan,'MBs',dt*1000,'ms',N/1024**2,'MB',N)
buf=None
if __name__ == '__main__':
QueueI=Queue()
In=ProcessIn(QueueI)
Out=ProcessOut(QueueI)
print(os.getpid(),'main')
In.start()
Out.start()
input('press key to finish\n')