Манипулировать переменными Nextflow вне скриптов - PullRequest
0 голосов
/ 02 октября 2019

У меня есть процесс iterate_list. Процесс iterate_list берет список и что-то делает с каждым элементом в списке. При запуске сценария требуется два входа. Список и элемент, который нужно обработать (который он получает как потребитель из очереди rabbitmq)

В настоящее время я даю скрипту на python весь список, и он перебирает каждый из них, обрабатывая их (как единое целое). большой кусок) и возвращается после завершения. Это нормально, однако, если система перезапускается, она запускается заново.

Мне было интересно, как я могу сделать так, чтобы каждый раз, когда мой скрипт Python обрабатывал один элемент, он возвращал элемент, яудалите его из списка, а затем передайте новый список процессу. Таким образом, в случае перезапуска / сбоя системы nextflow знает, где он остановился, и может продолжить с него.

import groovy.json.JsonSlurper

def jsonSlurper = new JsonSlurper()
def cfg_file = new File('/config.json')
def analysis_config = jsonSlurper.parse(cfg_file)
def cfg_json = cfg_file.getText()
def list_of_items_to_process = [] 

items = Channel.from(analysis_config.items.keySet())

for (String item : items) {
    list_of_items_to_process << item
    } 

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    """ 
}

process signal_completion{

    echo true

    input:
    val typing_cur

    script:
    """
    echo "all done!"
    """
}

По сути, процесс "iterate_list" берет один "элемент" из очереди в посреднике сообщений. ,Процесс iterate_list должен выглядеть примерно так:

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    list_of_items_to_process.remove(<output from python script>)
    """
}

И поэтому для каждого из них он должен запуститься, удалить элемент, который он обработал, и перезапустить с новым списком.

initial_list = [1,2,3,4]
after_first_process_completes = [2,3,4]
and_eventually = [] <- This is when it should move on to the next process.

1 Ответ

1 голос
/ 03 октября 2019

Похоже, что вы на самом деле пытаетесь манипулировать глобальным ArrayList из процесса Nextflow. AFAIK, нет способа сделать это точно. Для этого каналы .

Не ясно, нужно ли вам вообще удалять какие-либо элементы из списка элементов для обработки. Nextflow уже может использовать кэшированные результаты, используя опцию -resume. Так почему бы просто не передать полный список и один элемент для обработки?

items = Channel.from(['foo', 'bar', 'baz'])

items.into {
    items_ch1
    items_ch2
}

process iterate_list{

    input:
    val item from items_ch1
    val list_of_items_to_process from items_ch2.collect()

    """
    python3.7 process_list_items.py "${item}" '${list_of_items_to_process}'
    """
}

Я могу только догадываться, как ваш Python-скрипт использует свои аргументы, но если ваш список элементов для обработки - это простозаполнитель, то вы можете даже ввести один элемент списка элементов для обработки:

items = Channel.from(['foo', 'bar', 'baz'])

process iterate_list{

    input:
    val item from items

    """
    python3.7 process_list_items.py "${item}" '[${item}]'
    """
}
...