DEV Community

loading...

How Do I Split Up a Large query In Sidekiq and Rails?

theianjones profile image Ian Jones Originally published at stackoverflow.com ・4 min read

I posted this question on Stack Overflow and thought I would get the dev.to communities perspective as well.

When we (egghead.io) release a course, we send an email blast to all of our users that are opted into commercial communication. When this happens, our server starts returning 500's because of timed out requests. We use a sidekiq worker (BroadcastMessageSendWorker) to create jobs that will send the email we have written. We use Sidekiq Pro so we can use the bulk processing feature.

ruby '2.6.5'
gem 'pg', '= 1.2.2'
gem 'rails', '= 5.2.4.1'
gem 'sidekiq', '= 5.2.2'
gem 'sidekiq-pro', '= 4.0.4'
class BroadcastMessageSendWorker
  include Sidekiq::Worker

  def perform(message_guid)
    ActiveRecord::Base.connection_pool.with_connection do
      message = BroadcastMessage.find(message_guid)

      message.with_lock do
        return unless message.pending?

        message.pickup!

        if message.contacts.count == 0
          message.finish!
          return
        end

        batch = Sidekiq::Batch.new
        batch.on(:complete, self.class, 'guid' => message_guid)
        batch.jobs do

          # We can't use `uniq` or `DISTINCT` with find_in_batches because after 1000 records it
          # will start blowing up. Instead, use an in-memory `seen` index
          seen = Set.new({})

          message.contacts.select(:id).find_in_batches do |contact_batch|
            args = contact_batch.pluck(:id).map do |contact_id| 
              next unless seen.add?(contact_id) # add? returns nil if the object is already in the set

              [message_guid, contact_id]
            end

            Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
          end
        end

        message.update(batch_id: batch.bid)
      end
    end
  end

  def on_complete(_, options)
    message = BroadcastMessage.find(options['guid'])
    message.finish! if message.sending?
  end
end

We are building an in memory set to be sure that we don't send 2 of the same emails to a user. ScoutAPM is telling us that the message.contacts.select(:id) line is taking a long time (contacts joins our users table, so this is somewhat expected).

I analyzed this query:

Subquery Scan on contacts  (cost=226960.78..230344.36 rows=52055 width=32) (actual time=555.876..692.685 rows=87926 loops=1)
  Filter: (NOT (hashed SubPlan 1))
  ->  CTE Scan on base_contacts  (cost=224403.49..226485.69 rows=104110 width=264) (actual time=523.530..636.032 rows=87926 loops=1)
        CTE base_contacts
          ->  Gather  (cost=189856.23..224403.49 rows=104110 width=306) (actual time=523.525..554.679 rows=87926 loops=1)
                Workers Planned: 2
                Workers Launched: 2
                ->  Parallel Hash Left Join  (cost=188856.23..212992.49 rows=43379 width=306) (actual time=524.667..557.537 rows=29309 loops=3)
                      Hash Cond: (contacts_1.user_id = users.id)
                      Filter: ((contacts_1.user_id IS NULL) OR (users.can_contact AND ((users.managed_subscription_id IS NULL) OR CASE WHEN (users.managed_subscription_id = ANY ('{2,236,690}'::integer[])) THEN false ELSE true END)))
                      Rows Removed by Filter: 12924
                      ->  Parallel Seq Scan on contacts contacts_1  (cost=149225.21..168513.90 rows=47078 width=306) (actual time=272.862..365.114 rows=42233 loops=3)
                            Filter: ((NOT (hashed SubPlan 2)) AND (NOT (hashed SubPlan 3)))
                            Rows Removed by Filter: 108423
                            SubPlan 2
                              ->  Seq Scan on mailkick_opt_outs mailkick_opt_outs_1  (cost=0.00..2147.74 rows=71817 width=22) (actual time=0.044..16.912 rows=71898 loops=3)
                                    Filter: (active AND (list IS NULL))
                                    Rows Removed by Filter: 19576
                            SubPlan 3
                              ->  Nested Loop  (cost=0.43..146644.75 rows=101271 width=4) (actual time=0.098..142.573 rows=325264 loops=3)
                                    ->  Seq Scan on broadcast_messages  (cost=0.00..9.80 rows=1 width=4) (actual time=0.066..0.085 rows=1 loops=3)
                                          Filter: (signature = 'broadcast_message_signature'::text)
                                          Rows Removed by Filter: 63
                                    ->  Index Scan using index_ahoy_messages_on_broadcast_message_id on ahoy_messages  (cost=0.43..144633.82 rows=200113 width=8) (actual time=0.030..107.063 rows=325264 loops=3)
                                          Index Cond: (broadcast_message_id = broadcast_messages.id)
                                          Filter: ((user_type)::text = 'ClassType'::text)
                      ->  Parallel Hash  (cost=36562.34..36562.34 rows=176534 width=9) (actual time=106.742..106.742 rows=141443 loops=3)
                            Buckets: 131072  Batches: 8  Memory Usage: 3168kB
                            ->  Parallel Seq Scan on users  (cost=0.00..36562.34 rows=176534 width=9) (actual time=0.044..74.643 rows=141443 loops=3)
  SubPlan 1
    ->  Seq Scan on mailkick_opt_outs  (cost=0.00..2376.43 rows=72345 width=22) (actual time=0.011..14.309 rows=74331 loops=1)
          Filter: (active AND ((list IS NULL) OR ((list)::text = 'javascript'::text)))
          Rows Removed by Filter: 17143
Planning Time: 0.458 ms
Execution Time: 715.945 ms

The Parallel Seq Scan is taking a lot of time but I dont know how to speed it up.

My first thought it to split this worker into different ranges of IDs and query the database at different times to reduce the load on the database. So instead of querying message.contacts I would query message.contacts.where('id > 1 && id < 10000') and then message.contacts.where('id > 10001 && id < 20000')etc until we reached the max id.

This feels naive. How do I either speed this query up or spread it out over time?

I also thought of adding a multi-column index on users.managed_subscription_id and users.managed_subscription_id but hadn't tried that yet.

Discussion (2)

pic
Editor guide
Collapse
rhymes profile image
rhymes

Hi Ian, interesting conundrum. The query plan is much more complicated than what it seems from the worker's code, I guess that .contacts isn't that straigthforward.

At first glance there's a lot going on in that worker.

Let's see if we can tackle it a piece at a time:

  1. the global lock
message.with_lock do

this sets an exclusive lock on the row for the duration of the transaction opened by .with_lock. So we know that this blocks all transactions from accessing that row, but it also waits if the row is being accessed by another transaction.

I'm not sure you need to lock the row for so long as the state machine methods (let's call them like this :D) seems to be clustered at the top. Could it make sense to only lock the row around those methods and prepare the batch outside the lock?

Basically when you start preparing the batches, you're done with message, I think you can release the lock by then.

  1. the count
if message.contacts.count == 0

this issues a SELECT COUNT(*) which seems fine but it can probably be optimized by using if messages.contacts.none? which issues a SELECT ... LIMIT 1. You may want to test both speeds as I'm uncertain about the schema.

  1. the batching
seen = Set.new({})

message.contacts.select(:id).find_in_batches do |contact_batch|
 args = contact_batch.pluck(:id).map do |contact_id| 
   next unless seen.add?(contact_id) # add? returns nil if the object is already in the set

   [message_guid, contact_id]
 end

 Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
end

Here you're loading contacts in batches, using an in memory set to make sure you haven't seen the contact_id already. I have a question: why can they be duplicate in the first place if the contact ids come from message.contacts ?

I had immediately thought of:

message.contacts.select(:id).distinct.find_in_batches

but you said that's too slow as well.

Another issue with that set is that you're storing, for the entire duration of the job all the IDs in memory. According to the plan you have tens of thousands of them.

I then re-read the code and thought about a different approach to the whole thing (aside from the long locking).

  1. job #1 counts the total and splits the size evenly distributed (basically its goal would be to create a bunch of start and finish parameters to give to find_in_batches). It then creates a bunch of jobs #2
  2. job #2 takes the parameters and does the selecting and preparation of the bulks for Sidekiq
  3. job #3 is what actually sends the emails

This as you say would reduce the running time by splitting multiple concurrent jobs in smaller SELECTs.

You still have to solve the problem of the duplication. But there might be a solution if you use Redis and its atomic transactions: redis.io/topics/transactions

The other idea, untested, here is to have job #1 split the ids and queueing job #2 N times (1 for each pair of start and finish offsets). job #2 doesn't directly calls the "send email jobs" BUT prepares data in Redis. When the batch of jobs #2 is completed. Then you fire off the batches of job #3. When job #3 starts it will check with Redis if it actually has to send the email or not (the not will come by Redis basically telling them "someone else already sent an email to this contact id").

Let me know if any of this makes sense :D

Collapse
rhymes profile image
rhymes

Just found this Sidekiq wiki page called Really Complex Workflows with Batches.