многопоточное общение в python3.6 freeze - PullRequest
0 голосов
/ 05 мая 2018

Я пытаюсь разработать многопоточную функцию в Python 3.6 и иногда мой код зависает. из моих тестов я думаю, что проблема в os.write () или os.read (), но я не знаю почему.

вот мой код (я не думаю, что partalTransform () вызывает зависание, но я его понял, чтобы понять код):

def naiveTransform(netData,**kwargs):

        #parralelisable part
        def partialTransform(debut, fin) :
            for i in range(debut, fin) :
                j = 0
                #calcul of all the distances :
                while j < nbrPoint :
                    distance[j] = euclidianDistance(netData[i], netData[j])
                    j += 1

                #construction of the graph :
                j = 0
                del distance[i]
                while j < k :
                    nearest = min(distance, key=distance.get)
                    del distance[nearest]   #if k > 1 we don't want to get always the same point.
                    graph.append([i, nearest])
                    j += 1

            return graph



        k = kwargs.get('k', 1)  # valeur par défault à definir.
        nbrCore = kwargs.get('Core', 1)
        nbrPoint = len(netData)
        nbrPointCore = nbrPoint//nbrCore
        distance = dict()
        graph = []

        #pipes
        r = [-1]*nbrCore
        w = [-1]*nbrCore
        pid = [-1]*nbrCore

        for i in range(nbrCore):
            r[i], w[i] = os.pipe()

            try:
                pid[i] = os.fork()
            except OSError:
                exit("Could not create a child process\n")


            if pid[i] == 0:
                if i < nbrCore-1 :
                    g = partialTransform(i*nbrPointCore, (i+1)*nbrPointCore)
                else :
                    g = partialTransform(i*nbrPointCore, nbrPoint)  #to be sure that there is not a forgoten point.
                print("write in " + str(i))
                import sys
                print(sys.getsizeof(g))
                os.write(w[i], pickle.dumps(g))
                print("exit")
                exit()


        for i in range(nbrCore):
            print("waiting " + str(i))
            finished = os.waitpid(pid[i], 0)
            print("received")
            graph += pickle.loads(os.read(r[i], 250000000))

        return graph

Когда аргумент k больше или равен 5, код останавливается после

print(sys.getsizeof(g))

Для моего примера, когда k = 4, размер равен 33928, а для k = 5 размер равен 43040, поэтому я не думаю, что это проблема? Количество используемых ядер не влияет на зависание.

Я все еще новичок в python, так что это может быть что-то очевидное, но я не нашел подобной проблемы в интернете. Есть ли у вас какие-либо идеи о том, что может привести к замораживанию диссертаций?

1 Ответ

0 голосов
/ 05 мая 2018

Каналы имеют буферы ограниченного размера, и дочерний элемент будет блокировать запись канала, пока родитель не прочитает его. Но родитель ждет выхода ребенка, так что вы повесите. Вы можете избежать ограничения буфера, записав объект во временный файл. Данные будут находиться в кеше файлов операционной системы, когда родительский объект выполнит чтение, поэтому они по-прежнему будут быстрыми.

Во всем этом есть хитрость. Родитель должен убедить libc пересмотреть файл после того, как его запишет потомок, иначе чтение будет просто удовлетворено внутренним кешем нулевой длины. Вы можете сделать это с seek.

import tempfile

def naiveTransform(netData,**kwargs):

        // *** code removed for example ***
        # files
        tmp = [tempfile.TemporaryFile() for _ in range(nbrCore)]
        pid = [-1]*nbrCore

        for i in range(nbrCore):
            try:
                pid[i] = os.fork()
            except OSError:
                exit("Could not create a child process\n")


            if pid[i] == 0:
                if i < nbrCore-1 :
                    g = partialTransform(i*nbrPointCore, (i+1)*nbrPointCore)
                else :
                    g = partialTransform(i*nbrPointCore, nbrPoint)  #to be sure that there is not a forgoten point.
                print("write in " + str(i))
                import sys
                print(sys.getsizeof(g))
                pickle.dump(g, tmp[i])
                tmp[i].close()
                print("exit")
                exit()

        for i in range(nbrCore):
            print("waiting " + str(i))
            finished = os.waitpid(pid[i], 0)
            print("received")
            # seek to get updated file content
            tmp[i].seek(0,2)
            tmp[i].seek(0)
            graph += pickle.load(tmp[i])

        return graph
...