Я разработчик Ruffus. Я не уверен, что полностью понимаю, что вы пытаетесь сделать, но здесь говорится:
Ожидание заданий, которые занимают различное количество времени для завершения следующего этапа вашего конвейера, - это именно то, о чем говорит Руффус, так что, надеюсь, это просто.
Первый вопрос: знаете ли вы, какие файлы создаются заранее, то есть до запуска конвейера? Давайте начнем с предположения, что вы делаете.
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
Давайте напишем фиктивную функцию, которая создает файл при каждом его вызове. В Ruffus любые имена входных и выходных файлов содержатся в первых двух параметрах соответственно. У нас нет имени входного файла, поэтому вызовы наших функций должны выглядеть следующим образом:
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
Определение create_file будет выглядеть так:
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
Каждый из этих файлов будет создан за 3 отдельных вызова create_file. Они могут выполняться параллельно, если хотите.
pipeline_run([create_file], multiprocess = 5)
Теперь, чтобы объединить файлы. Декоратор "@Merge" действительно настроен именно для этого. Нам просто нужно связать его с предыдущей функцией:
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
Это вызовет merge_file, только когда все файлы будут готовы после трех вызовов create_file ().
Весь код выглядит следующим образом:
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
И вот результат:
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file