Запустите другой DelayedJob из DelayedJob и получите обновления прогресса - PullRequest
0 голосов
/ 28 апреля 2020

Я использую DelayedJob с DelayedJobProgress в проекте Rails 5.2.

У меня уже есть несколько пользовательских заданий, которые выполняются некоторое время (от 1 до 10 минут), и для обновления пользовательского прогресса я использую канал ActionCable, чтобы отправить его sh в браузер. Пример:

class JobStatusChannel < ApplicationCable::Channel
  def subscribed
    stream_for Delayed::Job.find(params[:id])
  end
end

module JobStatus
  private
  ##
  # Updates job status and sends broadcasts to subscribers
  def update_job_status(message: nil, progress: nil)
    @job.update_columns({message: message, progress_current: progress}.compact)

    obj = {
        message: message,
        progress_current: progress,
      }.compact
    JobStatusChannel.broadcast_to(@job, obj)
  end
end

class UpdateObjectJob <  Struct.new(:id, :mode)
  include JobStatus
  def enqueue(job)
    job.record            = UpdateObject.find(id) 
    job.identifier        = SecureRandom.hex 
    job.progress_max      = 100
    job.progress_current  = 0
    job.message = "Enqueued (#{mode})."
  end

  def before(job)
    @job  = job
    @data = job.record
  end

  def perform
    update_job_status(message: "Started updating Object", progress: 10)
    ## Work to do ###
  end
end

Теперь я пытаюсь построить «пакетный процесс» вокруг этих заданий, поэтому у меня есть задание, которое должно запускать (дочерние) задания и сообщать пользователю об общем прогрессе.

В настоящее время это работает более или менее так (проще говоря, я убрал получение дочерних элементов, их зависимостей, обработку ошибок и т. Д. c):

class BatchRunnerJob < Struct.new(:id)
### SNIP
  def perform
    # we need an inline worker for this
    w = Delayed::Worker.new
    ### get child jobs that should run now and run them one after another
    active_child_jobs.each_with_index do |j,i|
      update_job_status(message:  "Processing ##{j.id} #{j.name} (#{finished_jobs_count+i+1}/#{child_jobs.length})...")

      # enqueue in far future, so that our workers don't fetch it
      dj = Delayed::Job.enqueue j.job_class.constantize.new(*j.params), run_at: 100.years.from_now
      # run with worker inline
      r = w.run(dj)
      update_job_status(message: "Finished: ##{j.id} #{j.name}")
    end
  end
end

Это нормально для коротких заданий, но проблема с этим заключается в том, что я не могу передать пользователю sh обновления выполнения дочерних заданий. Поэтому выполнение пакетного задания останавливается, пока выполняется дочернее задание, и я хотел бы обновить родительское задание по мере его выполнения.

Я также думал о запуске дочернего задания другим (фоновым) рабочим и продолжайте опрашивать его прогресс в пакетном задании. Pro: Я мог бы запускать дочерние процессы параллельно. Con: Мне нужно найти интервал опроса, достаточно длинный для производительности и достаточно короткий, чтобы сократить время обработки коротких заданий вместо ожидания продолжения следующего опроса.

Так что я надеюсь, что кто-то придумает лучшую идею, чем эта, поскольку я думаю, что опрос - это решение прошлого;)

Возможно ли подписаться на канал ребенка изнутри родитель и установить некоторый обратный вызов, который обновляет прогресс родителя, когда ребенок передает обновление?

Основная проблема для меня - связь между дочерним и родительским заданием и пересылка этой информации пользователю.

Спасибо за любой совет

...