How Sidekiq Works

A deep dive into how the famous background processing Ruby library https://github.com/mperham/sidekiq works under the hood.

tiagonbotelho

Built straight from my text editor.

Try Charta for free

Overview

Today we'll look at Sidekiq, one of the most famous libraries in the Ruby ecosystem for async job processing. It uses Redis for persistence and queue logic and exposes a simple class-based abstraction for defining workers. Whilst Sidekiq has many interesting features such as delayed queues and powerful middleware extensions, this code tour focuses on the core workflows of enqueing and processing jobs. The aim is to leave you with a good understanding of how it works end to end.

Architecture Overview

Sidekiq gives you a simple API which you include to build jobs on top of.  As the example below shows, to get started the only method you'll need to implement is "perform" alongside the arguments it'll receive whenever it is scheduled. Including Sidekiq::Job, also exposes a "perform_async" method at the class level that can call to enqueue a job:

Sidekiq then splits into two main parts: 1. The logic that enqueues a job to a Redis queue, with Sidekiq::Job as its entrypoint. 2. The logic that processes a job from Redis with effectively an infinite loop continuously polling Redis for new jobs and processing them.

Enqueuing a Job

Sidekiq::Job

The entrypoint to scheduling a job in Sidekiq, based on the example above, would be to invoke "PlainOldRuby.perform_async('hard', 2)", which under the hood is actually calling Sidekiq::Job.perform_async(*args) as shown below

Within the same file a Setter instance is initialised with our own example class being passed to it and ultimately set as the instance variable `@klass`.

Afterwards, Setter#perform_async can then be called:

194 def perform_async(*args)
195 if @opts["sync"] == true
We are background processing after all, so we'll ignore this bit and focus on the "async" version below
196 perform_inline(*args)
197 else
198 @klass.client_push(@opts.merge("args" => args, "class" => @klass))
199 end
200 end

During a client_push, the actual client that connects to Redis will be instantiated and push the job item to the provided queue (called unsurprisingly 'default' by default).

Redis Client

Sidekiq's Redis client will first normalise the provided job item into a standard format, and, if any specific client middleware is configured (see https://github.com/mperham/sidekiq/blob/main/docs/middleware.md) it goes through it first, before pushing the job onto Redis:

85 def push(item)
86 normed = normalize_item(item)
87 payload = middleware.invoke(item["class"], normed, normed["queue"], @redis_pool) do
Since we are only using the default Sidekiq::Middleware::Chain, this doesn't actually do anything and just returns normed
88 normed
89 end
90 if payload
91 verify_json(payload)
Verify that only simple JSON data types are being passed (i.e. no atoms)
92 raw_push([payload])
The actual push to Redis happens here
93 payload["jid"]
94 end
95 end

Before we dive deeper, it is quite interesting to explore the data structure that actually forms one of our jobs, present in the normalise_item util invoked above.

Pushing to Redis

With the job payload built, we finally arrive at the point where we are ready to publish it to our Redis queue

210 def raw_push(payloads)
211 @redis_pool.with do |conn|
We get a connection from the pool of Redis connections made available to us by the connection_pool gem (https://github.com/mperham/connection_pool) which returns a connection provided by redis-client (https://github.com/redis-rb/redis-client)
212 retryable = true
213 begin
214 conn.pipelined do |pipeline|
Quoting from https://redis.io/docs/manual/pipelining/ > Redis pipelining is a technique for improving performance by issuing multiple commands at once without waiting for the response to each individual command.
215 atomic_push(pipeline, payloads)
216 end

The atomic push described below will ensure that the queue you are scheduling the job under actually exists and then push the job payload onto redis on that specific queue.

234 def atomic_push(conn, payloads)
235 if payloads.first.key?("at")
We are only enqueuing the job without giving it a timestamp on "when" to run, so we are skipping this option altogether (but you can kind of infer what is doing by reading the snippet below).
236 conn.zadd("schedule", payloads.flat_map { |hash|
237 at = hash.delete("at").to_s
238 [at, Sidekiq.dump_json(hash)]
239 })
240 else
241 queue = payloads.first["queue"]
242 now = Time.now.to_f
243 to_push = payloads.map { |entry|
244 entry["enqueued_at"] = now
245 Sidekiq.dump_json(entry)
246 }
247 conn.sadd("queues", [queue])
Adds the queue to the set of queues if it doesn't exist already
248 conn.lpush("queue:#{queue}", to_push)
Push the JSON Payload to the queue key (in our case the default queue)
249 end
250 end

And there we go, our job is now published to our Redis queue and waiting to be processed. But how does our job actually get processed? The next section answers precisely that question, let's dive in!

Dequeuing a Job

In order to process jobs, Sidekiq ships with a separate process that connects to Redis and is constantly on the look-out for new jobs coming in. This Sidekiq Worker begins in the `bin/sidekiq` CLI which brings up a CLI Instance in our terminal.

26 cli = Sidekiq::CLI.instance
27 cli.parse
28
29 integrate_with_systemd
For the purpose of this walkthrough, you can assume this method is skipped
30
31 cli.run
Sidekiq Worker begins to run

The run method in itself doesn't have anything that interesting for us to explore except the "launch" call, which is what actually starts processing the work.

The Launcher

The Launcher gets initialised with a set of Managers, each managing its own queue of jobs (in our case only the "default" queue exists, but you can imagine how it'd work with more queues added).

25 def initialize(config, embedded: false)
26 @config = config
27 @embedded = embedded
28 @managers = config.capsules.values.map do |cap|
29 Sidekiq::Manager.new(cap)
By default only one "default" manager gets initialised
30 end
31 @poller = Sidekiq::Scheduled::Poller.new(@config)
The Poller is slightly counter-intuitive, it actually looks out for jobs in either the "retried" or "scheduled" queues that have passed their timestamp and should be enqueued.
32 @done = false
33 end

When the launcher starts running, it will have three main components to it: - A heartbeat that periodically performs multiple checks on how the overall system is performing (are jobs being processed, is the Launcher still running, etc) - The poller we've described above, listening for jobs that need to be retried or have a fixed scheduled timestamp - Start up each queue manager

Managers

From the three elements of the launcher, we are particularly interested in the Managers, which for a given queue, start up a fleet of workers (or Processors as we'll see below), each being tasked with, as the name indicates, performing a job unit.

When the Launcher begins to run, as we've seen above, Manager#run gets invoked which, in turn, kick starts each Processor

Processor

Since we want each worker working concurrently, we register a new Thread for each of them under the same name (based on the queue being processed).

The processor can then begin to run and processing one job at a time

The process_one method, simply does two things, the first one being fetching the job from the Redis queue, which we'll explore first

Fetching from the Redis queue

The processor begins by attempting to fetch a single job from the queue

98 def fetch
99 j = get_one
100 if j && @done
In the event that the job happens to be pulled at the same time the system gets flagged for termination, we re-queue the job back into its original queue
101 j.requeue
102 nil
103 else
104 j
105 end
106 end

Lets then check how Sidekiq::BasicFetch fetches from the queue:

40 def retrieve_work
41 qs = queues_cmd
Returns ["queue:default"]
42 ...
43 queue, job = redis { |conn| conn.blocking_call(false, "brpop", *qs, TIMEOUT) }
BRPOP is the blocking counterpart to Redis RPOP method, which blocks (for a given TIMEOUT) if the queue is empty. It pops an element from the tail of the queue, building a FIFO style of queue.
44 UnitOfWork.new(queue, job, config) if queue
45 end

The raw job is then used to build a "Unit of Work" which we explore below

Unit of Work

The brief structure of a Unit of Work is then built from the popped item from the Redis queue. It encapsulates a few convenience methods such as getting the respective queue name, providing the primitive to re-queue the job back into the original queue, etc.

Processing a Unit of Work

We now end up with a unit of work that Sidekiq needs to process. We begin by fetching all the metadata available about the job itself, such as the loose JSON object initially stored in Redis and its queue name.

With all the information ready, we begin processing the job itself, lets break that down:

170 ack = false
171 begin
172 dispatch(job_hash, queue, jobstr) do |inst|
173 config.server_middleware.invoke(inst, job_hash, queue) do
server_middleware uses a Middleware Chain that by default is empty, but you can imagine this as a series of pre-processing steps done in sequence, similar to the client_middleware option we've discussed while exploring how to enqueue a Job. For example if you wanted to Throttle the amount of jobs Sidekiq can process per second, this would be the place you'd put it.
174 execute_job(inst, job_hash["args"])
175 end
176 end
177 ack = true

`dispatch`, aside from setting up logging and retrying primitives for the job itself (which are out of the scope of this tour), instantiates the Job's class itself (in our case "PlainOldRuby").

After our Job's class being created and all the middleware being initialised, we finally reach the final stage, where we invoke "execute_job" which simply executes our original "perform" function seen in our example "PlainOldRuby" job, with the provided arguments.

And we are done, we've successfully processed our job.

References

- Sidekiq's repository - https://github.com/mperham/sidekiq - Sidekiq's Best Practices - https://github.com/mperham/sidekiq/wiki/Best-Practices - Redis LPUSH - https://redis.io/commands/lpush/ - Redis BRPOP - https://redis.io/commands/brpop/ - Redis Pipelining - https://redis.io/docs/manual/pipelining/

If you read this far...

Thank you for reading this article, hope you've found it helpful. I'm one of the co-creators of https://charta.dev (the platform you are reading this article on) and we are trying to make it easier for developers to document their code straight from their editors. If you've found it interesting considering signing up to learn more and start diving deep into your first codebase!

Explore and document code. Straight from your editor.

Get started