Распределенная Джулия: параллельная карта (pmap) с тайм-аутом / лимитом времени для каждой задачи карты для завершения - PullRequest
1 голос
/ 11 февраля 2020

Мой проект включает параллельное вычисление карты с использованием функции Distributed pmap Джулии.

Отображение заданного элемента может занять несколько секунд , или это может занять по существу навсегда. Я хочу, чтобы тайм-аут или лимит времени для выполнения отдельной задачи / вычисления на карте.

Если задание на карте заканчивается вовремя, отлично, вернуть результат вычисления. Если задание не выполнено к сроку, прекратите вычисление, когда оно будет достигнуто, и верните некоторое значение или сообщение, указывающее, что истекло время ожидания.

Ниже приведен минимальный пример. Сначала импортируются модули, а затем запускаются рабочие процессы:

num_procs = 1
using Distributed
if num_procs > 1
    # The main process (no calling addprocs) can be used for `pmap`:
    addprocs(num_procs-1)
end

Затем задача сопоставления определяется для всех рабочих процессов. Задание сопоставления должно прерваться через 1 секунду:

@everywhere import Random
@everywhere begin
    """
    Compute stuff for `wait_time` seconds, and return `wait_time`.
    If `timeout` seconds elapses, stop computation and return something else.
    """
    function waitForTimeUnlessTimeout(wait_time, timeout=1)

        # < Insert some sort of timeout code? >

        # This block of code simulates a long computation.
        # (pretend the computation time is unknown)
        x = 0
        while time()-t0 < wait_time
            x += Random.rand() - 0.5
        end

        # computation completed before time limit. Return wait_time.
        round(wait_time, digits=2)
    end
end

Функция, которая выполняет параллельное отображение (pmap), определена в основном процессе. Каждое задание карты в произвольном порядке занимает до 2 секунд, но должно пройти через 1 секунду.

function myParallelMapping(num_tasks = 20, max_runtime=2)    
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # return the parallel computation of the mapping tasks
    pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes)
end

print(myParallelMapping())

Как должна быть реализована эта параллельная карта с ограничением по времени?

Ответы [ 2 ]

1 голос
/ 11 февраля 2020

Вы можете поместить что-то подобное в ваше pmap тело

pmap(runtimes) do runtime
  t0 = time()
  task = @async waitForTimeUnlessTimeout(runtime)
  while !istaskdone(task) && time()-t0 < time_limit
      sleep(1)
  end
  istaskdone(task) && (return fetch(task))
  error("time over")
end

Также обратите внимание, что (runtime)->waitForTimeUnlessTimeout(runtime) - это то же самое, что и waitForTimeUnlessTimeout.

0 голосов
/ 11 февраля 2020

После очень полезного ответа @Fredrik Bagge, вот полный рабочий пример реализации с некоторыми дополнительными пояснениями.

num_procs = 8
using Distributed
if num_procs > 1
    addprocs(num_procs-1)
end

@everywhere import Random
@everywhere begin
    function waitForTime(wait_time)
         # This code block simulates a long computation.
         # Pretend the computation time is unknown.
        t0 = time()
        x = 0
        while time()-t0 < wait_time
            x += Random.rand() - 0.5
            yield() # CRITICAL to release computation to check if task is done.
            # If you comment out #yield(), you will see timeout doesn't work!
        end

        return round(wait_time, digits=2)
    end
end

function myParallelMapping(num_tasks = 16, max_runtime=2, time_limit=1)
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # parallel compute the mapping tasks. See "do block" in 
    # the Julia documentation, it's just syntactic sugar.
    return pmap(runtimes) do runtime
                  t0 = time()
                  task = @async waitForTime(runtime)
                  while !istaskdone(task) && time()-t0 < time_limit
                      # releases computation to waitForTime
                      sleep(0.1)
                      # nothing past here will run until waitForTime calls yield()
                      # *and* 0.1 seconds have passed.
                  end
                  # equal to if istaskdone(task); return fetch(task); end
                  istaskdone(task) && (return fetch(task))
                  return "TimeOut"
                  # `return error("TimeOut")` halts pmap unless pmap is
                  #  given an error handler argument. See pmap documentation.
              end
end

Вывод:

julia> print(myParallelMapping())

       Any["TimeOut", "TimeOut", 0.33, 0.35, 0.56, 0.41, 0.08, 0.14, 0.72, 
           "TimeOut", "TimeOut", "TimeOut", 0.52, "TimeOut", 0.33, "TimeOut"]

Обратите внимание, что есть две задачи за процесс в этом примере. Исходная задача («проверка времени») проверяет каждые 0,1 секунды, завершила ли другая задача вычисление. Другая задача (созданная с помощью @async) - это что-то вычислять, периодически вызывая yield(), чтобы передать управление контролеру времени; если он не вызывает yield(), проверка времени не может произойти.

...