Проблемы управления памятью Pypy CFFI - PullRequest
0 голосов
/ 08 февраля 2019

T попытался ускорить мой код Pypy, добавив некоторые функции c.Проблема в том, что использование памяти постоянно увеличивается!Я видел несколько постов на эту тему и попытался сделать простой тест, чтобы проиллюстрировать это.В моем тесте ниже я могу освободить память в соответствии с этим постом ( Проблемы управления памятью Python CFFI ), но мой код вылетает примерно так: free (): недопустимый следующий размер (нормальный).Кажется, я дважды освобождаю память ...

Может кто-нибудь помочь мне решить мою проблему?

import gc
from cffi import FFI
from time import sleep
import execnet
ffibuilder = FFI()


Create_TypeStructure = """
typedef struct _STRUCT1
    {
        int Data1;
        int Data2;
        int Data3;
        double Data4;
        double Data5;
        double Data6;
        double Data7;
        double Data8;

    }STRUCT1, *PSTRUCT1;
"""

Create_DataStructure = """
PSTRUCT1 CreateDataStructure()
 {
     PSTRUCT1 pStruct1 = ( PSTRUCT1 ) malloc( sizeof( STRUCT1 )*6000 );
     SetDummyValues(&pStruct1);
     if(pStruct1 != NULL) printf("SECOND TEST: ptr is not null/n");
     else printf("SECOND TEST: ptr is null/n");
     return pStruct1;
 }
"""

Set_DummyValues = """
void SetDummyValues( PSTRUCT1 ptr )
    {
        ptr = NULL;
    }
"""

Free_DataStructure = """
 void FreeDataStructure(PSTRUCT1 ptr) 
    {
        free(ptr);
    }  
"""


Some_Function = """
PSTRUCT1 SomeCFunction(STRUCT1 *p) 
    {

        int tmp=-1;
        int numline = 5999;

        while (tmp < numline) 
        {
        tmp++ ;
        {
            p[tmp].Data1 = 1000000;
            p[tmp].Data2 = 1000000;
            p[tmp].Data3 = 1000000;
            p[tmp].Data4 = 2125585.265;
            p[tmp].Data5 = 2125585.265;
            p[tmp].Data6 = 2125585.265;
            p[tmp].Data7 = 2125585.265;
            p[tmp].Data8 = 2125585.265;
        }

        }

        return p;
    }


"""

ffibuilder.cdef(Create_TypeStructure) #declare strucutres
ffibuilder.cdef('PSTRUCT1 CreateDataStructure();') #declare function 
ffibuilder.cdef('void FreeDataStructure(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('PSTRUCT1 SomeCFunction(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('void SetDummyValues(PSTRUCT1 pStruct1);') #declare function

ffibuilder.set_source("c", Create_TypeStructure + Set_DummyValues + Create_DataStructure + Free_DataStructure + Some_Function )
ffibuilder.compile(verbose=True)
from c import ffi, lib


def worker_process(channel):
    # Import modules
    import gc
    from time import sleep
    import execnet
    from c import ffi, lib
    # task processor, sits on each CPU
    channel.send("ready")

    for x in channel:
        if x is None:  # we can shutdown
                break
        data_list = {}
        for i in range(15000):  
            # Create a pointer to the data structure and tell the garbage collector how to destroy it
            gc_c_pDataStructure = ffi.gc( lib.CreateDataStructure(), lib.FreeDataStructure )
            lib.SomeCFunction( gc_c_pDataStructure )  # Populate the data structure
            data_list[i]= gc_c_pDataStructure # Store some data 
            #lib.FreeDataStructure(data_list[i])

        for i in range(15000):
            lib.FreeDataStructure(data_list[i])

        #sleep(15)
        channel.send("Ok!")


numberOfTasks = 500 # simulations to launch
workerCount = 1 # CPUs
group = execnet.Group()
for i in range(workerCount):  # CPUs
    group.makegateway()



# execute taskprocessor everywhere
mch = group.remote_exec(worker_process)

# get a queue that gives us results
q = mch.make_receive_queue(endmarker="Stop")


tasks = range(numberOfTasks)  # a list of tasks, here just integers


terminated = 0

while 1:
    channel, item = q.get()
    if item == "Stop":
        terminated += 1
        print "Terminated task on channel %s" % channel.gateway.id
    if terminated == len(mch):
        print "Got all results, Finish!"
        break
        continue
    if item != "ready":
        print "%s: Terminated: %s" % (channel.gateway.id, item)
    if not tasks:
        print "No tasks remain, sending termination request to all"
        mch.send_each(None)
        tasks = -1
    if tasks and tasks != -1:
        task = tasks.pop()
        channel.send(task)
        print "Sent task %r to channel %s" % (task, channel.gateway.id)

group.terminate()

1 Ответ

0 голосов
/ 11 февраля 2019

Мое решение: заставить сборщик мусора Pypy работать и убрать ручное освобождение.Также не нужно устанавливать указатель на NULL.

import gc
from cffi import FFI
from time import sleep
import execnet
ffibuilder = FFI()
import __pypy__


Create_TypeStructure = """
typedef struct _STRUCT1
    {
        int Data1;
        int Data2;
        int Data3;
        double Data4;
        double Data5;
        double Data6;
        double Data7;
        double Data8;

    }STRUCT1, *PSTRUCT1;
"""

Create_DataStructure = """
PSTRUCT1 CreateDataStructure()
 {
     PSTRUCT1 pStruct1 = ( PSTRUCT1 ) malloc( sizeof( STRUCT1 )*6000 );
     return pStruct1;
 }
"""


Free_DataStructure = """
 void FreeDataStructure(PSTRUCT1 ptr) 
    {
        free(ptr);
    }  
"""


Some_Function = """
PSTRUCT1 SomeCFunction(STRUCT1 *p) 
    {

        int tmp=-1;
        int numline = 5999;

        while (tmp < numline) 
        {
        tmp++ ;
        {
            p[tmp].Data1 = 1000000;
            p[tmp].Data2 = 1000000;
            p[tmp].Data3 = 1000000;
            p[tmp].Data4 = 2125585.265;
            p[tmp].Data5 = 2125585.265;
            p[tmp].Data6 = 2125585.265;
            p[tmp].Data7 = 2125585.265;
            p[tmp].Data8 = 2125585.265;
        }

        }

        return p;
    }


"""

ffibuilder.cdef(Create_TypeStructure) #declare strucutres
ffibuilder.cdef('PSTRUCT1 CreateDataStructure();') #declare function 
ffibuilder.cdef('void FreeDataStructure(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('PSTRUCT1 SomeCFunction(PSTRUCT1 ptr);') #declare function

ffibuilder.set_source("c", Create_TypeStructure + Create_DataStructure + Free_DataStructure + Some_Function )
ffibuilder.compile(verbose=True)
from c import ffi, lib


def worker_process(channel):
    # Import modules
    import gc
    from time import sleep
    import execnet
    from c import ffi, lib
    import __pypy__
    # task processor, sits on each CPU
    channel.send("ready")

    for x in channel:
        if x is None:  # we can shutdown
                break
        data_list = {}
        for i in range(5000):  
            # Create a pointer to the data structure and tell the garbage collector how to destroy it
            gc_c_pDataStructure = ffi.gc( lib.CreateDataStructure(), lib.FreeDataStructure )
            __pypy__.add_memory_pressure(6000)
            lib.SomeCFunction( gc_c_pDataStructure )  # Populate the data structure
            data_list[i]= gc_c_pDataStructure # Store some data 
            #lib.FreeDataStructure(data_list[i])

        #for i in range(5000):
            #lib.FreeDataStructure(data_list[i])

        #sleep(15)
        channel.send("Ok!")


numberOfTasks = 500 # simulations to launch
workerCount = 3 # CPUs
group = execnet.Group()
for i in range(workerCount):  # CPUs
    group.makegateway()



# execute taskprocessor everywhere
mch = group.remote_exec(worker_process)

# get a queue that gives us results
q = mch.make_receive_queue(endmarker="Stop")


tasks = range(numberOfTasks)  # a list of tasks, here just integers


terminated = 0

while 1:
    channel, item = q.get()
    if item == "Stop":
        terminated += 1
        print "Terminated task on channel %s" % channel.gateway.id
        if terminated == len(mch):
            print "Got all results, Finish!"
            break
        continue
    if item != "ready":
        print "%s: Terminated: %s" % (channel.gateway.id, item)
    if not tasks:
        print "No tasks remain, sending termination request to all"
        mch.send_each(None)
        tasks = -1
    if tasks and tasks != -1:
        task = tasks.pop()
        channel.send(task)
        print "Sent task %r to channel %s" % (task, channel.gateway.id)

group.terminate()
...