требует, чтобы набор файлов был сделан перед запуском функции в конвейере Ruffus - PullRequest
1 голос
/ 18 марта 2010

Я использую ruffus для написания конвейера. У меня есть функция, которая вызывается параллельно много раз, и она создает несколько файлов. Я хотел бы сделать функцию «combFiles ()», которая вызывается после того, как все эти файлы были сделаны. Поскольку они работают параллельно в кластере, они не будут все вместе заканчиваться. Я написал функцию 'getFilenames ()', которая возвращает набор имен файлов, которые должны быть созданы, но как я могу заставить Объединение файлов () ожидать их появления?

Я попробовал следующее:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

Я также пробовал декоратор:

@merge(getFilenames)

но это тоже не работает. Объединение файлов по-прежнему вызывается по ошибке до создания файлов, заданных в getFilenames. Как я могу сделать объединение файлов условным для тех файлов, которые там находятся?

спасибо.

1 Ответ

2 голосов
/ 26 марта 2010

Я разработчик Ruffus. Я не уверен, что полностью понимаю, что вы пытаетесь сделать, но здесь говорится:

Ожидание заданий, которые занимают различное количество времени для завершения следующего этапа вашего конвейера, - это именно то, о чем говорит Руффус, так что, надеюсь, это просто.

Первый вопрос: знаете ли вы, какие файлы создаются заранее, то есть до запуска конвейера? Давайте начнем с предположения, что вы делаете.

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

Давайте напишем фиктивную функцию, которая создает файл при каждом его вызове. В Ruffus любые имена входных и выходных файлов содержатся в первых двух параметрах соответственно. У нас нет имени входного файла, поэтому вызовы наших функций должны выглядеть следующим образом:

create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

Определение create_file будет выглядеть так:

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

Каждый из этих файлов будет создан за 3 отдельных вызова create_file. Они могут выполняться параллельно, если хотите.

pipeline_run([create_file], multiprocess = 5)

Теперь, чтобы объединить файлы. Декоратор "@Merge" действительно настроен именно для этого. Нам просто нужно связать его с предыдущей функцией:

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

Это вызовет merge_file, только когда все файлы будут готовы после трех вызовов create_file ().

Весь код выглядит следующим образом:

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

И вот результат:

>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file
...