опрос с помощью delayed_job - PullRequest
       30

опрос с помощью delayed_job

32 голосов
/ 07 апреля 2011

У меня есть процесс, который обычно занимает несколько секунд, поэтому я пытаюсь использовать delayed_job для асинхронной обработки. Сама работа отлично работает, мой вопрос в том, как провести опрос, чтобы узнать, выполнено ли оно.

Я могу получить идентификатор из delayed_job, просто присвоив его переменной:

job = Available.delay.dosomething (: var => 1234)

+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| id   | priority | attempts | handler    | last_error | run_at      | locked_at | failed_at | locked_by | created_at | updated_at  |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| 4037 | 0        | 0        | --- !ru... |            | 2011-04-... |           |           |           | 2011-04... | 2011-04-... |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+

Но как только он завершает задание, он удаляет его, и поиск завершенной записи возвращает ошибку:

@job=Delayed::Job.find(4037)

ActiveRecord::RecordNotFound: Couldn't find Delayed::Backend::ActiveRecord::Job with ID=4037

@job= Delayed::Job.exists?(params[:id])

Должен ли я изменить это и, возможно, отложить удаление полных записей? Я не уверен, как еще я могу получить уведомление о его статусе. Или опрос мертвых записей в качестве доказательства завершения нормально? Кто-нибудь еще сталкивался с чем-то похожим?

Ответы [ 6 ]

45 голосов
/ 15 апреля 2011

Давайте начнем с API.Я хотел бы иметь что-то вроде следующего.

@available.working? # => true or false, so we know it's running
@available.finished? # => true or false, so we know it's finished (already ran)

Теперь давайте напишем работу.

class AwesomeJob < Struct.new(:options)

  def perform
    do_something_with(options[:var])
  end

end

Пока все хорошо.У нас есть работа.Теперь давайте напишем логику, которая ставит ее в очередь.Поскольку Доступен - модель, ответственная за эту работу, давайте научим ее, как начать эту работу.

class Available < ActiveRecord::Base

  def start_working!
    Delayed::Job.enqueue(AwesomeJob.new(options))
  end

  def working?
    # not sure what to put here yet
  end

  def finished?
    # not sure what to put here yet
  end

end

Итак, как мы узнаем, работает эта работа или нет?Есть несколько способов, но в рельсах кажется правильным, что когда моя модель что-то создает, она обычно ассоциируется с этим.Как мы общаемся?Использование идентификаторов в базе данных.Давайте добавим job_id к доступной модели.

Пока мы на ней, как мы узнаем, что задание не работает, потому что оно уже закончено или потому что оно еще не началось?Одним из способов является проверка того, что на самом деле сделала работа.Если он создал файл, проверьте, существует ли файл.Если он вычислил значение, проверьте, что результат записан.Однако некоторые задания проверить не так просто, так как может не быть четкого поддающегося проверке результата их работы.Для такого случая вы можете использовать флаг или метку времени в вашей модели.Предполагая, что это наш случай, давайте добавим job_finished_at отметку времени, чтобы отличить еще не выполненное задание от уже выполненного .

class AddJobIdToAvailable < ActiveRecord::Migration
  def self.up
    add_column :available, :job_id, :integer
    add_column :available, :job_finished_at, :datetime
  end

  def self.down
    remove_column :available, :job_id
    remove_column :available, :job_finished_at
  end
end

Хорошо.Так что теперь давайте на самом деле свяжем Available с его работой, как только мы поставим задачу в очередь, изменив метод start_working!.

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options))
  update_attribute(:job_id, job.id)
end

Отлично.К этому моменту я мог написать belongs_to :job, но нам это на самом деле не нужно.

Так что теперь мы знаем, как написать метод working?, так просто.

def working?
  job_id.present?
end

Но как мы помечаем работу как выполненную?Никто не знает, что работа закончена лучше, чем сама работа.Итак, давайте передадим available_id на работу (как один из вариантов) и используем его на работе.Для этого нам нужно изменить метод start_working! для передачи идентификатора.

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options.merge(:available_id => id))
  update_attribute(:job_id, job.id)
end

И мы должны добавить логику в работу, чтобы обновить нашу job_finished_at метку времени, когда она будет завершена.

class AwesomeJob < Struct.new(:options)

  def perform
    available = Available.find(options[:available_id])
    do_something_with(options[:var])

    # Depending on whether you consider an error'ed job to be finished
    # you may want to put this under an ensure. This way the job
    # will be deemed finished even if it error'ed out.
    available.update_attribute(:job_finished_at, Time.current)
  end

end

С этим кодом мы знаем, как написать наш finished? метод.

def finished?
  job_finished_at.present?
end

И все готово.Теперь мы можем просто опросить @available.working? и @available.finished?. Кроме того, вы получаете удобство в определении того, какое именно задание было создано для вашего Доступного, отметив @available.job_id.Вы можете легко превратить это в реальную ассоциацию, сказав belongs_to :job.

14 голосов
/ 19 апреля 2011

В итоге я использовал комбинацию Delayed_Job с обратным вызовом after (job), который заполняет объект memcached тем же идентификатором, что и у созданного задания.Таким образом, я минимизирую количество обращений к базе данных, запрашивая статус задания, вместо этого опрашивая объект memcached.И он содержит весь объект, который мне нужен из выполненной работы, поэтому у меня даже нет запроса туда и обратно.Я получил идею из статьи ребят из github, которые сделали почти то же самое.

https://github.com/blog/467-smart-js-polling

и использовал плагин jquery для опроса, который опрашивает реже и отказывается после определенного числа попыток

https://github.com/jeremyw/jquery-smart-poll

Кажется, отлично работает.

 def after(job)
    prices = Room.prices.where("space_id = ? AND bookdate BETWEEN ? AND ?", space_id.to_i, date_from, date_to).to_a
    Rails.cache.fetch(job.id) do
      bed = Bed.new(:space_id => space_id, :date_from => date_from, :date_to => date_to, :prices => prices)
    end
  end
13 голосов
/ 16 апреля 2011

Я думаю, что лучшим способом было бы использовать обратные вызовы, доступные в delayed_job.Это:: успех,: ошибка и: после.так что вы можете поместить некоторый код в вашу модель с помощью:after метод там.ИМХО, это намного лучше, чем опросить какое-то значение, которое может быть даже специфичным для бэкэнда (например, ActiveRecord против Mongoid).

5 голосов
/ 19 апреля 2011

Самый простой способ выполнить это - изменить действие опроса на что-то похожее на следующее:

def poll
  @job = Delayed::Job.find_by_id(params[:job_id])

  if @job.nil?
    # The job has completed and is no longer in the database.
  else
    if @job.last_error.nil?
      # The job is still in the queue and has not been run.
    else
      # The job has encountered an error.
    end
  end
end

Почему это работает?Когда Delayed::Job запускает задание из очереди, оно удаляет его из базы данных , в случае успеха .Если задание не выполнено, запись остается в очереди для последующего повторного запуска, а для атрибута last_error устанавливается обнаруженная ошибка.Используя две вышеупомянутые функциональные возможности, вы можете проверить наличие удаленных записей, чтобы увидеть, были ли они успешными.

Преимущества вышеуказанного метода:

  • Вы получаете эффект опроса, который вы искали в исходном сообщении
  • Используя простую логическую ветвь, вы можете предоставить обратную связь пользователю, если при обработке задания возникнет ошибка

Вы можете включить эту функциональность в метод модели, выполнив что-то вроде следующего:

# Include this in your initializers somewhere
class Queue < Delayed::Job
  def self.status(id)
    self.find_by_id(id).nil? ? "success" : (job.last_error.nil? ? "queued" : "failure")
  end
end

# Use this method in your poll method like so:
def poll
    status = Queue.status(params[:id])
    if status == "success"
      # Success, notify the user!
    elsif status == "failure"
      # Failure, notify the user!
    end
end
1 голос
/ 15 апреля 2011

Таблица delayed_jobs в вашем приложении предназначена только для отображения состояния запущенных и поставленных в очередь заданий.Это не постоянная таблица, и она действительно должна быть как можно меньше по соображениям производительности.Вот почему задания удаляются сразу после завершения.

Вместо этого вы должны добавить поле к вашей модели Available, которое означает, что задание выполнено.Поскольку меня обычно интересует, сколько времени занимает обработка задания, я добавляю поля start_time и end_time.Тогда мой dosomething метод будет выглядеть примерно так:

def self.dosomething(model_id)

 model = Model.find(model_id)

  begin
    model.start!

    # do some long work ...

    rescue Exception => e
      # ...
    ensure
      model.finish!
  end
end

Начало!и закончить!Методы просто записывают текущее время и сохраняют модель.Тогда у меня был бы метод completed?, который ваш AJAX может опрашивать, чтобы увидеть, завершено ли задание.

def completed?
  return true if start_time and end_time
  return false
end

Есть много способов сделать это, но я считаю этот метод простым и хорошо работает для меня.

1 голос
/ 13 апреля 2011

Я бы посоветовал, что если важно получить уведомление о том, что задание выполнено, то написать пользовательский объект задания и очередь , что , а не полагаться на задание по умолчанию, которое ставится в очередь при вызове * 1003. *. Создайте объект что-то вроде:

class DoSomethingAvailableJob

  attr_accessor options

  def initialize(options = {})
    @options = options
  end

  def perform
    Available.dosomething(@options)
    # Do some sort of notification here
    # ...
  end
end

и поставьте в очередь:

Delayed::Job.enqueue DoSomethingAvailableJob.new(:var => 1234)
...