Когда мы выпускаем курс, мы рассылаем электронное письмо всем нашим пользователям, которые выбрали коммерческое общение. Когда это происходит, наш сервер начинает возвращать 500 из-за истечения времени ожидания запросов. Мы используем сотрудника sidekiq (BroadcastMessageSendWorker
) для создания рабочих мест, которые будут отправлять написанное нами электронное письмо. Мы используем Sidekiq Pro, чтобы мы могли использовать функцию массовой обработки.
ruby '2.6.5'
gem 'pg', '= 1.2.2'
gem 'rails', '= 5.2.4.1'
gem 'sidekiq', '= 5.2.2'
gem 'sidekiq-pro', '= 4.0.4'
class BroadcastMessageSendWorker
include Sidekiq::Worker
def perform(message_guid)
ActiveRecord::Base.connection_pool.with_connection do
message = BroadcastMessage.find(message_guid)
message.with_lock do
return unless message.pending?
message.pickup!
if message.contacts.count == 0
message.finish!
return
end
batch = Sidekiq::Batch.new
batch.on(:complete, self.class, 'guid' => message_guid)
batch.jobs do
# We can't use `uniq` or `DISTINCT` with find_in_batches because after 1000 records it
# will start blowing up. Instead, use an in-memory `seen` index
seen = Set.new({})
message.contacts.select(:id).find_in_batches do |contact_batch|
args = contact_batch.pluck(:id).map do |contact_id|
next unless seen.add?(contact_id) # add? returns nil if the object is already in the set
[message_guid, contact_id]
end
Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
end
end
message.update(batch_id: batch.bid)
end
end
end
def on_complete(_, options)
message = BroadcastMessage.find(options['guid'])
message.finish! if message.sending?
end
end
Мы создаем набор в памяти, чтобы быть уверенным, что мы не отправляем 2 одинаковых электронных письма пользователю. ScoutAPM говорит нам, что строка message.contacts.select(:id)
занимает много времени (контакты присоединяются к нашей таблице пользователей, так что это несколько ожидаемо).
Я проанализировал этот запрос:
Subquery Scan on contacts (cost=226960.78..230344.36 rows=52055 width=32) (actual time=555.876..692.685 rows=87926 loops=1)
Filter: (NOT (hashed SubPlan 1))
-> CTE Scan on base_contacts (cost=224403.49..226485.69 rows=104110 width=264) (actual time=523.530..636.032 rows=87926 loops=1)
CTE base_contacts
-> Gather (cost=189856.23..224403.49 rows=104110 width=306) (actual time=523.525..554.679 rows=87926 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Hash Left Join (cost=188856.23..212992.49 rows=43379 width=306) (actual time=524.667..557.537 rows=29309 loops=3)
Hash Cond: (contacts_1.user_id = users.id)
Filter: ((contacts_1.user_id IS NULL) OR (users.can_contact AND ((users.managed_subscription_id IS NULL) OR CASE WHEN (users.managed_subscription_id = ANY ('{2,236,690}'::integer[])) THEN false ELSE true END)))
Rows Removed by Filter: 12924
-> Parallel Seq Scan on contacts contacts_1 (cost=149225.21..168513.90 rows=47078 width=306) (actual time=272.862..365.114 rows=42233 loops=3)
Filter: ((NOT (hashed SubPlan 2)) AND (NOT (hashed SubPlan 3)))
Rows Removed by Filter: 108423
SubPlan 2
-> Seq Scan on mailkick_opt_outs mailkick_opt_outs_1 (cost=0.00..2147.74 rows=71817 width=22) (actual time=0.044..16.912 rows=71898 loops=3)
Filter: (active AND (list IS NULL))
Rows Removed by Filter: 19576
SubPlan 3
-> Nested Loop (cost=0.43..146644.75 rows=101271 width=4) (actual time=0.098..142.573 rows=325264 loops=3)
-> Seq Scan on broadcast_messages (cost=0.00..9.80 rows=1 width=4) (actual time=0.066..0.085 rows=1 loops=3)
Filter: (signature = 'broadcast_message_signature'::text)
Rows Removed by Filter: 63
-> Index Scan using index_ahoy_messages_on_broadcast_message_id on ahoy_messages (cost=0.43..144633.82 rows=200113 width=8) (actual time=0.030..107.063 rows=325264 loops=3)
Index Cond: (broadcast_message_id = broadcast_messages.id)
Filter: ((user_type)::text = 'ClassType'::text)
-> Parallel Hash (cost=36562.34..36562.34 rows=176534 width=9) (actual time=106.742..106.742 rows=141443 loops=3)
Buckets: 131072 Batches: 8 Memory Usage: 3168kB
-> Parallel Seq Scan on users (cost=0.00..36562.34 rows=176534 width=9) (actual time=0.044..74.643 rows=141443 loops=3)
SubPlan 1
-> Seq Scan on mailkick_opt_outs (cost=0.00..2376.43 rows=72345 width=22) (actual time=0.011..14.309 rows=74331 loops=1)
Filter: (active AND ((list IS NULL) OR ((list)::text = 'javascript'::text)))
Rows Removed by Filter: 17143
Planning Time: 0.458 ms
Execution Time: 715.945 ms
Parallel Seq Scan
занимает много времени, но я не знаю, как его ускорить.
Сначала я подумал, что нужно разделить этого работника на разные диапазоны идентификаторов и запрашивать базу данных в разное время, чтобы уменьшить нагрузку на базу данных. Поэтому вместо запроса message.contacts
я бы запросил message.contacts.where('id > 1 && id < 10000')
, а затем message.contacts.where('id > 10001 && id < 20000')
et c, пока мы не достигнем максимального идентификатора.
Это кажется наивным. Как мне ускорить этот запрос или распределить его по времени?
Я также думал о добавлении многостолбцового индекса на users.managed_subscription_id
и users.managed_subscription_id
, но еще не пробовал.