I have a Sidekiq worker class that run for a duration which can be quite long sometimes.
Jobs and job states are persisted in database records.
class Worker
include Sidekiq::Worker
def perform
prepare_job_record
do_work_that_might_be_long
mark_job_completed
end
end
Sometimes worker processes are restarted due to code release.
The job record has an attribute for job state and it's "stuck" on "started" when the the job is interrupted due to worker process terminated.
1st attempt
def perform
# work...
rescue Sidekiq::Shutdown
mark_job_interrupted
end
Failed~
2nd attempt
I read the source code of Sidekiq to see if I can patch it.
But after reading it I think that would be too complex and dangerous, so I didn't even try.
But if you are interested, you can take a look on how it shutdowns the running threads:
-
Sidekiq::Manager#stop
- https://github.com/mperham/sidekiq/blob/v6.1.2/lib/sidekiq/manager.rb#L61 - Do some work to stop processors processing new jobs, run callbacks...
-
Pausing to allow workers to finish...
(The waiting time can be controlled and it is described in https://github.com/mperham/sidekiq/wiki/Deployment#overview) -
Sidekiq::Processor#kill
(https://github.com/mperham/sidekiq/blob/v6.1.2/lib/sidekiq/processor.rb#L49)
The Solution
Let's look at events
https://github.com/mperham/sidekiq/wiki/Deployment#events
On free version this also works but the fetch is delayed (something like 5s? Not sure).
Sidekiq Pro users have super_fetch
and the callback will be triggered much quicker it seems.
So I wrote a singleton class in initializer:
module Sidekiq
# Too lazy to think of better name...
class LifecycleCenter
include Singleton
def initialize
super
@quiet = false
end
def quiet!
@quiet = true
end
def quiet?
@quiet
end
end
end
Sidekiq.configure_server do |config|
# Other config
config.on(:quiet) do
Sidekiq::LifecycleCenter.instance.quiet!
end
end
Nothing special. A singleton class instead of a global variable.
What takes time to figure out is the next part: Raise specific error in worker after Sidekiq fired quiet event.
Since I read the Sidekiq source code earlier, I think I should use a thread.
But I think I am a noob in threading and should try to find a gem that does the threading for me, so I found concurrent-ruby
.
In JS there is setInterval
, in concurrent-ruby
there is Concurrent::TimerTask
.
Note that any error raised inside the block passed into the task object will not raise any error (handled by the gem).
So this will NOT raise error:
sidekiq_state_checking_task = ::Concurrent::TimerTask.new(
execution_interval: 1,
) do |task|
task.shutdown if true
raise CustomError if true
end
sidekiq_state_checking_task.execute
Feel free to try it in ruby console.
I studied the doc for Concurrent::TimerTask
and tried several solutions and finally got this:
class Worker
include Sidekiq::Worker
# Not sure if inheriting from `Interrupt` is necessary
# That's what Sidekiq does on `Sidekiq::Shutdown` though
class CustomShutdown < Interrupt; end
def perform
prepare_job_record
main_thread = ::Thread.current
sidekiq_state_checking_task = ::Concurrent::TimerTask.new(
execution_interval: 1,
) do |task|
::Sidekiq::LifecycleCenter.instance.quiet?.tap do |quiet|
# If you don't shutdown the task
# it might keep running
task.shutdown if quiet
end
end
sidekiq_state_checking_task.with_observer do |_time, quiet|
next unless quiet
main_thread.raise CustomShutdown
end.execute
do_work_that_might_be_long
mark_job_completed
rescue CustomShutdown => e
mark_job_aborted
reenqueue_the_job_sometimes
end
end
The point is to use observer to handle the result from the block (the task). Remember to shutdown the task or it might keep running (even error raised in main thread).
TL;DR
Put the following code in initializer:
module Sidekiq
# Too lazy to think of better name...
class LifecycleCenter
include Singleton
def initialize
super
@quiet = false
end
def quiet!
@quiet = true
end
def quiet?
@quiet
end
end
end
Sidekiq.configure_server do |config|
# Other config
config.on(:quiet) do
Sidekiq::LifecycleCenter.instance.quiet!
end
end
Put the following code in your worker:
class Worker
include Sidekiq::Worker
# Not sure if inheriting from `Interrupt` is necessary
# That's what Sidekiq does on `Sidekiq::Shutdown` though
class CustomShutdown < Interrupt; end
def perform
prepare_job_record
main_thread = ::Thread.current
sidekiq_state_checking_task = ::Concurrent::TimerTask.new(
execution_interval: 1,
) do |task|
::Sidekiq::LifecycleCenter.instance.quiet?.tap do |quiet|
# If you don't shutdown the task
# it might keep running
task.shutdown if quiet
end
end
sidekiq_state_checking_task.with_observer do |_time, quiet|
next unless quiet
main_thread.raise CustomShutdown
end.execute
do_work_that_might_be_long
mark_job_completed
rescue CustomShutdown => e
mark_job_aborted
reenqueue_the_job_sometimes
end
end
Feel free to let me know what you think about this method.
Or write an article about your method and I will be happy to add a link to your article!
Top comments (1)
Good one, we just faced this and understood it the hard way how the worker starts the loop again when sidekiq server is restarted. Does your solution still work in 2024?