Совместное использование памяти ctype между процессами Python, используя адрес - PullRequest
0 голосов
/ 03 февраля 2019

Я пытаюсь отправить динамический массив через несколько процессов в Python.Моим первым решением была отправка данных напрямую через очередь / канал многопроцессорного класса.Проблема в том, что он ограничен пропускной способностью соединения Ethernet.Поэтому я пытаюсь использовать массив ctype и передать только адрес объекта.Когда я пытаюсь получить доступ к массиву из второго процесса (A.raw или A.value), процесс завершается без каких-либо исключений.У кого-нибудь есть идея, что происходит?Возможно некоторые проблемы с блокировкой и т. Д.

from multiprocessing import Process,Queue
from ctypes import c_char,addressof

from time import sleep
import os




class ProcessIn(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):
        Array=[]
        while True:
            N=100000
            A=(c_char*N)()
            A.value=b'\x01'
            Address=addressof(A)
            Array.append(A)
            print(os.getpid(),'putted',Address)
            self.QueueI.put((Address,N))
            sleep(2)





class ProcessOut(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):
        while True:
            print(os.getpid(),'step 1')
            Address,N=self.QueueI.get()
            print(os.getpid(),'step 2',Address)
            A=(c_char*N).from_address(Address)      
            print(os.getpid(),'step 3')
            Value=A.raw         #This will fail 
            print(os.getpid(),'step 4',Value)   
            sleep(1)

if __name__ == '__main__':
    QueueI=Queue()

    In=ProcessIn(QueueI)
    Out=ProcessOut(QueueI)
    print(os.getpid(),'main')
    In.start()
    Out.start()
    input('press key to finish\n')

1 Ответ

0 голосов
/ 04 февраля 2019

Хорошо, я понял - используя mmap с тегом:

from multiprocessing import Process,Queue
from ctypes import c_char,addressof
import pyarrow as pa
import numpy as np
from time import sleep
import os
import datetime
import mmap
from sys import getsizeof



class ProcessIn(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):  
        i=0
        while True:
            N=np.random.randint(10,14)*100000
            data = b'abcdefghijklmnopqrstuvwxyz'        
            Tag='Data_'+str(i)
            buf = mmap.mmap(-1, N*len(data),tagname=Tag)            
            buf[0]=i
            NN=N*len(data)
            # print(buf[0:10])

            print(os.getpid(),'putted',Tag,NN)
            if self.QueueI.qsize()==0:
                self.QueueI.put((Tag,NN,datetime.datetime.now()))           


            i+=1
            sleep(1)


class ProcessOut(Process):
    def __init__(self,QueueI):
        super().__init__(daemon=True)
        self.QueueI=QueueI

    def run(self):
        while True:
            # print(os.getpid(),'step 1')
            Tag,N,start=self.QueueI.get()           
            buf =  mmap.mmap(-1, N,tagname=Tag)

            print('got',buf[0:10],Tag)

            # data=buf.read()
            dt=(datetime.datetime.now()-start).total_seconds()
            if dt!=0:
                print(os.getpid(),N/dt/1024**2,'MBs',dt*1000,'ms',N/1024**2,'MB',N) 
            else:
                print(os.getpid(),np.nan,'MBs',dt*1000,'ms',N/1024**2,'MB',N)   
            buf=None

if __name__ == '__main__':
    QueueI=Queue()

    In=ProcessIn(QueueI)
    Out=ProcessOut(QueueI)
    print(os.getpid(),'main')
    In.start()
    Out.start()
    input('press key to finish\n')
...