DEV Community

loading...

[Sidekiq] How To: Interrupt a running job when Sidekiq worker process is shutting down

pikachuexe profile image PikachuEXE Updated on ・3 min read

I have a Sidekiq worker class that run for a duration which can be quite long sometimes.
Jobs and job states are persisted in database records.

class Worker
  include Sidekiq::Worker

  def perform
    prepare_job_record

    do_work_that_might_be_long

    mark_job_completed
  end
end
Enter fullscreen mode Exit fullscreen mode

Sometimes worker processes are restarted due to code release.
The job record has an attribute for job state and it's "stuck" on "started" when the the job is interrupted due to worker process terminated.

1st attempt

def perform
  # work...
rescue Sidekiq::Shutdown
  mark_job_interrupted
end
Enter fullscreen mode Exit fullscreen mode

Failed~

2nd attempt
I read the source code of Sidekiq to see if I can patch it.
But after reading it I think that would be too complex and dangerous, so I didn't even try.
But if you are interested, you can take a look on how it shutdowns the running threads:

The Solution
Let's look at events
https://github.com/mperham/sidekiq/wiki/Deployment#events

On free version this also works but the fetch is delayed (something like 5s? Not sure).
Sidekiq Pro users have super_fetch and the callback will be triggered much quicker it seems.

So I wrote a singleton class in initializer:


module Sidekiq
  # Too lazy to think of better name...
  class LifecycleCenter
    include Singleton

    def initialize
      super
      @quiet = false
    end

    def quiet!
      @quiet = true
    end

    def quiet?
      @quiet
    end
  end
end

Sidekiq.configure_server do |config|
  # Other config

  config.on(:quiet) do
    Sidekiq::LifecycleCenter.instance.quiet!
  end
end

Enter fullscreen mode Exit fullscreen mode

Nothing special. A singleton class instead of a global variable.

What takes time to figure out is the next part: Raise specific error in worker after Sidekiq fired quiet event.

Since I read the Sidekiq source code earlier, I think I should use a thread.
But I think I am a noob in threading and should try to find a gem that does the threading for me, so I found concurrent-ruby.

In JS there is setInterval, in concurrent-ruby there is Concurrent::TimerTask.
Note that any error raised inside the block passed into the task object will not raise any error (handled by the gem).
So this will NOT raise error:

sidekiq_state_checking_task = ::Concurrent::TimerTask.new(
  execution_interval: 1,
) do |task|
  task.shutdown if true
  raise CustomError if true
end
sidekiq_state_checking_task.execute
Enter fullscreen mode Exit fullscreen mode

Feel free to try it in ruby console.

I studied the doc for Concurrent::TimerTask and tried several solutions and finally got this:

class Worker
  include Sidekiq::Worker

  # Not sure if inheriting from `Interrupt` is necessary
  # That's what Sidekiq does on `Sidekiq::Shutdown` though
  class CustomShutdown < Interrupt; end

  def perform
    prepare_job_record

    main_thread = ::Thread.current
    sidekiq_state_checking_task = ::Concurrent::TimerTask.new(
      execution_interval: 1,
    ) do |task|
      ::Sidekiq::LifecycleCenter.instance.quiet?.tap do |quiet|
        # If you don't shutdown the task
        # it might keep running
        task.shutdown if quiet
      end
    end
    sidekiq_state_checking_task.with_observer do |_time, quiet|
      next unless quiet

      main_thread.raise CustomShutdown
    end.execute

    do_work_that_might_be_long

    mark_job_completed

  rescue CustomShutdown => e
    mark_job_aborted
    reenqueue_the_job_sometimes
  end
end
Enter fullscreen mode Exit fullscreen mode

The point is to use observer to handle the result from the block (the task). Remember to shutdown the task or it might keep running (even error raised in main thread).

TL;DR

Put the following code in initializer:


module Sidekiq
  # Too lazy to think of better name...
  class LifecycleCenter
    include Singleton

    def initialize
      super
      @quiet = false
    end

    def quiet!
      @quiet = true
    end

    def quiet?
      @quiet
    end
  end
end

Sidekiq.configure_server do |config|
  # Other config

  config.on(:quiet) do
    Sidekiq::LifecycleCenter.instance.quiet!
  end
end

Enter fullscreen mode Exit fullscreen mode

Put the following code in your worker:

class Worker
  include Sidekiq::Worker

  # Not sure if inheriting from `Interrupt` is necessary
  # That's what Sidekiq does on `Sidekiq::Shutdown` though
  class CustomShutdown < Interrupt; end

  def perform
    prepare_job_record

    main_thread = ::Thread.current
    sidekiq_state_checking_task = ::Concurrent::TimerTask.new(
      execution_interval: 1,
    ) do |task|
      ::Sidekiq::LifecycleCenter.instance.quiet?.tap do |quiet|
        # If you don't shutdown the task
        # it might keep running
        task.shutdown if quiet
      end
    end
    sidekiq_state_checking_task.with_observer do |_time, quiet|
      next unless quiet

      main_thread.raise CustomShutdown
    end.execute

    do_work_that_might_be_long

    mark_job_completed

  rescue CustomShutdown => e
    mark_job_aborted
    reenqueue_the_job_sometimes
  end
end
Enter fullscreen mode Exit fullscreen mode

Feel free to let me know what you think about this method.
Or write an article about your method and I will be happy to add a link to your article!

Discussion (0)

pic
Editor guide