Applying backpressure with SizedQueue
As a follow-up to yesterday's Queue article, let's talk about SizedQueue. It's basically Queue, but with a maximum size.
That might sound like a small distinction, but it's a very useful one. A plain Queue lets the producer keep adding work for as long as memory allows. A SizedQueue makes the producer wait when the queue is full. In other words: it gives us backpressure.
NUMBER_OF_WORKERS = 3
QUEUE_SIZE = 5
jobs = SizedQueue.new(QUEUE_SIZE)
producer = Thread.new do
20.times do |i|
jobs << i
puts "Queued job #{i} (#{jobs.length}/#{jobs.max} waiting)"
end
jobs.close
end
workers = NUMBER_OF_WORKERS.times.map do |thread_number|
Thread.new do
while job = jobs.pop
sleep 0.05
puts "Worker #{thread_number} processed job #{job}"
end
end
end
producer.join
workers.each(&:join)
# Output:
# Queued job 0 (1/5 waiting)
# Queued job 1 (2/5 waiting)
# Queued job 2 (3/5 waiting)
# ...
# Queued job 7 (5/5 waiting)
# Worker 0 processed job 0
# Queued job 8 (5/5 waiting)
# ...
# Worker 2 processed job 19
With a regular Queue, pushing objects into the queue always succeeds immediately. With SizedQueue, it blocks when there are already SizedQueue#max items waiting in the queue. Once a worker calls pop, space opens up and the producer continues.
This is a nice way of saying that we don't just care about how many worker threads we have. We also care about how much work is allowed to pile up in front of them, e.g. because of memory constraints.
We can change SizedQueue#max at runtime if we have reason to dynamically adjust the amount of backpressure we want to apply:
queue = SizedQueue.new(10)
queue.max
# => 10
queue.max = 25
queue.max
# => 25
History
SizedQueue was released in Ruby 1.2 on Christmas 1998.