Sidekiq очередь для указанного объекта - PullRequest
0 голосов
/ 31 августа 2018

Я использую этот рабочий для процесса

class CreateOrUpdateContactWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1

    sidekiq_retries_exhausted do |msg|
        Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
    end

    def perform(user_id, changed_fields, update_address = false)
        ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
    end
end

В пользовательской модели у меня after_commit обратный вызов

def update_mautic_contact
    CreateOrUpdateContactWorker.perform_async(id, previous_changes.keys, ship_address_changed || false)
end

Проблема в том, что пользователь обновляется дважды одновременно, потому что до create_or_update_contact требуется некоторое время. Как я могу ограничить темы только для указанного пользователя? Что каждая задача будет выполняться одна за другой для указания user_id.

Ответы [ 2 ]

0 голосов
/ 04 сентября 2018

Я понял это с Redis, но без каких-либо драгоценных камней. Я использовал условие перед выполнением работника:

def update_mautic_contact
    if Rails.current.get("CreateOrUpdateContactWorkerIsRunning_#{id}")
        Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
        CreateOrUpdateContactWorker.perform_in(1.minutes, id, changed_fields)
    else
        Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
        CreateOrUpdateContactWorker.perform_async(id, changed_fields)
    end
end

и внутри работника:

class CreateOrUpdateContactWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1

    sidekiq_retries_exhausted do |msg|
        Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
    end

    def perform(user_id, changed_fields, update_address = false)
        ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
        Redis.current.del("CreateOrUpdateContactWorkerIsRunning_#{user_id}")
    end
end
0 голосов
/ 31 августа 2018

Я не знаю, есть ли у вас redis как часть вашей инфраструктуры, но то, что вы описываете, является условием гонки. Для ее решения вам понадобится мьютекс / блокировка вашего критического пути create_or_update_contact.

Состояние гонки здесь происходит между двумя асинхронными рабочими / процессами, поэтому вы не можете просто использовать простой ruby ​​mutex / lock. Вам нужен распределенный мьютекс, который использует хранилище / хранитель центрального замка. Это: https://github.com/kenn/redis-mutex должно сделать это за вас, но вам понадобится redis база данных.

В основном ваш код будет выглядеть примерно так:

class CreateOrUpdateContactWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1

    sidekiq_retries_exhausted do |msg|
        Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
    end

    def perform(user_id, changed_fields, update_address = false)
        RedisMutex.with_lock("#{user_id}_create_or_update_contact") do
            ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
        end
    end
end

Таким образом, если у вас есть 2 пользовательских обновления для user_id = 1 одновременно, первый, кто получит блокировку / мьютекс с именем 1_create_or_update_contact, выполнится первым и заблокирует другой вызов до его завершения, а затем начнется второй вызов.

Это решит вашу проблему :) Я думаю, redis необходим, полезен и полезен. Я не могу придумать ни одного из моих проектов rails без необходимости использовать redis.

...