Обработка условий гонки при выяснении, когда скрипты на нескольких машинах закончились - PullRequest
0 голосов
/ 19 октября 2018

У меня будет скрипт, работающий на разных машинах, и мне нужно выяснить, когда все экземпляры будут завершены, чтобы я мог запустить одну заключительную функцию (которая будет выполняться последним скриптом, который завершается).

Большую часть времени они начинаются и заканчиваются в разное время, но я бы хотел иметь возможность обрабатывать любой случай, когда несколько финишируют одновременно.Я использую многопоточность для тестирования и не могу найти надежного решения.Вот идеи, которые у меня были:

При попытке добавить к файлу:
Код, подобный приведенному ниже, где я добавляю уникальный символ UTF8 в файл.Когда количество символов совпадает с потоками, а символ совпадает с текущим, это означает, что последняя функция была запущена.К сожалению, последняя функция может прочитать файл до того, как другие закончили писать.

При попытке переименовать (очевидно, файл можно переименовать несколько раз):

def func(file_format, num_threads):
    for index in xrange(num_threads):
        try:
            os.rename(file_format.format(index), file_format.format(index + 1))
            print str(index) + '\n'
            break
        except WindowsError:
            pass

file_format = 'W:/tmp/peter/race_condition_test/unique_id.{}'
num_threads = 6
if __name__ == '__main__':
    with open(file_format.format(0), 'w') as f:
        pass
    for _ in range(num_threads):
        t = Thread(target=partial(func, file_format, 6))
        t.start()

Вывод:

0
0
0
1
1
1

С проверкой папки на наличие записанных файлов:

def func(iteration, folder, file_name, num_threads):
    path = os.path.join(folder, file_name)
    with open(path.format(iteration), 'w') as f:
        pass
    print str(os.listdir(folder)) + '\n'

num_threads = 6
folder = 'W:/tmp/peter/race_condition_test'
file_name = 'unique_id.{}'
if __name__ == '__main__':
    for i in range(num_threads):
        t = Thread(target=partial(func, i, folder, file_name, num_threads))
        t.start()

Вывод:

['unique_id.4', 'unique_id.2', 'unique_id.3', 'unique_id.0', 'unique_id.5', 'unique_id.1']
['unique_id.4', 'unique_id.2', 'unique_id.3', 'unique_id.0', 'unique_id.5', 'unique_id.1']
['unique_id.4', 'unique_id.2', 'unique_id.3', 'unique_id.0', 'unique_id.5', 'unique_id.1']
['unique_id.4', 'unique_id.2', 'unique_id.3', 'unique_id.0', 'unique_id.5', 'unique_id.1']
['unique_id.4', 'unique_id.2', 'unique_id.3', 'unique_id.0', 'unique_id.5', 'unique_id.1']
['unique_id.4', 'unique_id.2', 'unique_id.3', 'unique_id.0', 'unique_id.5', 'unique_id.1']

Выполнение окончательного кода только один раз является довольно важным, так как он использует APIвнешняя программа для обновления чего-либо.Не будет ли конец света, если бы мне пришлось использовать последний метод с возможностью запуска более одного раза, но есть ли другие хорошие способы сделать это?

1 Ответ

0 голосов
/ 19 октября 2018
def func(iteration, folder, file_name):
    path = os.path.join(folder, file_name)
    with open(path.format(iteration), 'w') as f:
        pass
    print str(os.listdir(folder)) + '\n'

num_threads = 6
folder = 'W:/tmp/peter/race_condition_test'
file_name = 'unique_id.{}'
if __name__ == '__main__':
    threads = []
    for i in range(num_threads):
        t = Thread(target=partial(func, i, folder, file_name))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()    # you need to join to know when they all finish

Или вы можете использовать ThreadPool в многопроцессорной обработке (не знаю, почему он скрыт в классе Multiprocess, а не в классе Thread, но он есть.

...