Distributing work to threads via Queue
As a follow-up to yesterday's Thread.new article, let's talk about a thread-safe way of distributing work to threads using Queue.
NUMBER_OF_WORKERS = 5
STOP_SIGNAL = :stop
jobs = Queue.new
producer = Thread.new do
100.times do |i|
sleep 0.001
jobs << i
end
end
workers = NUMBER_OF_WORKERS.times.map do |thread_number|
Thread.new do
loop do
job = jobs.pop
# Use :stop to signal the worker to stop processing jobs
break if job == STOP_SIGNAL
puts "Worker #{thread_number} processed job #{job}"
end
end
end
# Wait for the producer to finish adding jobs to the queue
producer.join
# Signal the workers to stop after all jobs are done
NUMBER_OF_WORKERS.times { jobs << STOP_SIGNAL }
# Wait for all workers to finish processing
workers.each(&:join)
# Output:
# Worker 0 processed job 0
# Worker 1 processed job 1
# Worker 2 processed job 2
# ...
# Worker 4 processed job 99
What's nice about this approach is that the producer and workers are decoupled. The producer can add jobs to the queue at its own pace, while the workers can process them concurrently without needing to know how many jobs there are or when they will be added. Queue#pop blocks the worker threads until there are jobs available, so we don't have to worry about busy waiting or other synchronization issues. The use of a sentinel value (:stop) allows us to gracefully shut down the worker threads once all jobs have been processed.
The sentinel value thing might seem a bit hacky, but it's a common pattern for signaling threads to stop processing. Another way of letting the workers know that there are no more jobs is to use Queue#close, which prevents new jobs from being added to the queue and causes any blocked pop calls to receive nil, which can then be used as a signal to stop processing:
queue = Queue.new
worker = Thread.new do
loop do
job = queue.pop
if job
puts job
else
puts "No more jobs, exiting thread"
break
end
end
end
queue << "Job 1"
queue << "Job 2"
queue.close
worker.join
# Output:
# Job 1
# Job 2
# No more jobs, exiting thread
A potentially even cleaner way to use Queue#close is in combination with Queue#empty? and Queue#closed? to avoid having to rely on nil as a sentinel value:
queue = Queue.new
worker = Thread.new do
loop do
if queue.empty? && queue.closed?
puts "No more jobs, exiting thread"
break
end
job = queue.pop
puts job
end
end
queue << "Job 1"
queue << "Job 2"
queue.close
worker.join
# Output:
# Job 1
# Job 2
# No more jobs, exiting thread
History
Queue has been part of Ruby since its inception.
It has evolved since then, but mostly by becoming faster, more integrated and a little more convenient rather than by changing its role:
- Ruby 1.8.6 gained the faster C-backed ext/thread implementation.
- Ruby 2.3 made Queue, SizedQueue and related synchronization classes built-ins, and added Queue#close, which gives producers a way to tell blocked consumers that no more work is coming.
- Ruby 3.1 made Queue.new accept an enumerable of initial contents.
- Ruby 3.2 added Queue#pop(timeout: seconds), so a worker can wait for a bounded amount of time instead of choosing between blocking forever and polling.
The core idea has stayed stable throughout: Queue is not just "a thread-safe array". It is a synchronization point between producers and consumers. It can block, wake and close in ways that a plain array cannot.