Как разделить большой запрос в Sidekiq Worker? - PullRequest
2 голосов
/ 18 марта 2020

Когда мы выпускаем курс, мы рассылаем электронное письмо всем нашим пользователям, которые выбрали коммерческое общение. Когда это происходит, наш сервер начинает возвращать 500 из-за истечения времени ожидания запросов. Мы используем сотрудника sidekiq (BroadcastMessageSendWorker) для создания рабочих мест, которые будут отправлять написанное нами электронное письмо. Мы используем Sidekiq Pro, чтобы мы могли использовать функцию массовой обработки.

ruby '2.6.5'
gem 'pg', '= 1.2.2'
gem 'rails', '= 5.2.4.1'
gem 'sidekiq', '= 5.2.2'
gem 'sidekiq-pro', '= 4.0.4'
class BroadcastMessageSendWorker
  include Sidekiq::Worker

  def perform(message_guid)
    ActiveRecord::Base.connection_pool.with_connection do
      message = BroadcastMessage.find(message_guid)

      message.with_lock do
        return unless message.pending?

        message.pickup!

        if message.contacts.count == 0
          message.finish!
          return
        end

        batch = Sidekiq::Batch.new
        batch.on(:complete, self.class, 'guid' => message_guid)
        batch.jobs do

          # We can't use `uniq` or `DISTINCT` with find_in_batches because after 1000 records it
          # will start blowing up. Instead, use an in-memory `seen` index
          seen = Set.new({})

          message.contacts.select(:id).find_in_batches do |contact_batch|
            args = contact_batch.pluck(:id).map do |contact_id| 
              next unless seen.add?(contact_id) # add? returns nil if the object is already in the set

              [message_guid, contact_id]
            end

            Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
          end
        end

        message.update(batch_id: batch.bid)
      end
    end
  end

  def on_complete(_, options)
    message = BroadcastMessage.find(options['guid'])
    message.finish! if message.sending?
  end
end

Мы создаем набор в памяти, чтобы быть уверенным, что мы не отправляем 2 одинаковых электронных письма пользователю. ScoutAPM говорит нам, что строка message.contacts.select(:id) занимает много времени (контакты присоединяются к нашей таблице пользователей, так что это несколько ожидаемо).

Я проанализировал этот запрос:

Subquery Scan on contacts  (cost=226960.78..230344.36 rows=52055 width=32) (actual time=555.876..692.685 rows=87926 loops=1)
  Filter: (NOT (hashed SubPlan 1))
  ->  CTE Scan on base_contacts  (cost=224403.49..226485.69 rows=104110 width=264) (actual time=523.530..636.032 rows=87926 loops=1)
        CTE base_contacts
          ->  Gather  (cost=189856.23..224403.49 rows=104110 width=306) (actual time=523.525..554.679 rows=87926 loops=1)
                Workers Planned: 2
                Workers Launched: 2
                ->  Parallel Hash Left Join  (cost=188856.23..212992.49 rows=43379 width=306) (actual time=524.667..557.537 rows=29309 loops=3)
                      Hash Cond: (contacts_1.user_id = users.id)
                      Filter: ((contacts_1.user_id IS NULL) OR (users.can_contact AND ((users.managed_subscription_id IS NULL) OR CASE WHEN (users.managed_subscription_id = ANY ('{2,236,690}'::integer[])) THEN false ELSE true END)))
                      Rows Removed by Filter: 12924
                      ->  Parallel Seq Scan on contacts contacts_1  (cost=149225.21..168513.90 rows=47078 width=306) (actual time=272.862..365.114 rows=42233 loops=3)
                            Filter: ((NOT (hashed SubPlan 2)) AND (NOT (hashed SubPlan 3)))
                            Rows Removed by Filter: 108423
                            SubPlan 2
                              ->  Seq Scan on mailkick_opt_outs mailkick_opt_outs_1  (cost=0.00..2147.74 rows=71817 width=22) (actual time=0.044..16.912 rows=71898 loops=3)
                                    Filter: (active AND (list IS NULL))
                                    Rows Removed by Filter: 19576
                            SubPlan 3
                              ->  Nested Loop  (cost=0.43..146644.75 rows=101271 width=4) (actual time=0.098..142.573 rows=325264 loops=3)
                                    ->  Seq Scan on broadcast_messages  (cost=0.00..9.80 rows=1 width=4) (actual time=0.066..0.085 rows=1 loops=3)
                                          Filter: (signature = 'broadcast_message_signature'::text)
                                          Rows Removed by Filter: 63
                                    ->  Index Scan using index_ahoy_messages_on_broadcast_message_id on ahoy_messages  (cost=0.43..144633.82 rows=200113 width=8) (actual time=0.030..107.063 rows=325264 loops=3)
                                          Index Cond: (broadcast_message_id = broadcast_messages.id)
                                          Filter: ((user_type)::text = 'ClassType'::text)
                      ->  Parallel Hash  (cost=36562.34..36562.34 rows=176534 width=9) (actual time=106.742..106.742 rows=141443 loops=3)
                            Buckets: 131072  Batches: 8  Memory Usage: 3168kB
                            ->  Parallel Seq Scan on users  (cost=0.00..36562.34 rows=176534 width=9) (actual time=0.044..74.643 rows=141443 loops=3)
  SubPlan 1
    ->  Seq Scan on mailkick_opt_outs  (cost=0.00..2376.43 rows=72345 width=22) (actual time=0.011..14.309 rows=74331 loops=1)
          Filter: (active AND ((list IS NULL) OR ((list)::text = 'javascript'::text)))
          Rows Removed by Filter: 17143
Planning Time: 0.458 ms
Execution Time: 715.945 ms

Parallel Seq Scan занимает много времени, но я не знаю, как его ускорить.

Сначала я подумал, что нужно разделить этого работника на разные диапазоны идентификаторов и запрашивать базу данных в разное время, чтобы уменьшить нагрузку на базу данных. Поэтому вместо запроса message.contacts я бы запросил message.contacts.where('id > 1 && id < 10000'), а затем message.contacts.where('id > 10001 && id < 20000') et c, пока мы не достигнем максимального идентификатора.

Это кажется наивным. Как мне ускорить этот запрос или распределить его по времени?

Я также думал о добавлении многостолбцового индекса на users.managed_subscription_id и users.managed_subscription_id, но еще не пробовал.

...