Unverified Commit f2c8968e authored by Chris Hobbs's avatar Chris Hobbs

Fail workers when their backlog grows over 100 items

Also fixes a performance bug by using sequential send instead of Channel.select
in a loop.
parent dbdefd00
Pipeline #349 failed with stage
in 21 seconds
......@@ -17,6 +17,7 @@ class PubRelay::SubscriptionManager::DeliverWorker
@stats : Stats,
@subscription_manager : SubscriptionManager
)
@mailbox = Channel::Buffered(Delivery).new(100)
end
getter(client : HTTP::Client) do
......
......@@ -147,16 +147,24 @@ class PubRelay::SubscriptionManager
counter = new_counter
delivery = DeliverWorker::Delivery.new(deliver.message, deliver.source_domain, counter, accept: false)
select_actions = @subscribed_workers.map(&.mailbox.send_select_action(delivery))
spawn do
until select_actions.empty?
index, _ = Channel.select(select_actions)
select_actions.delete_at(index)
@subscribed_workers.each do |worker|
# TODO: checking then sending is a race condition with threads
if worker.mailbox.full?
# backlog too high!
fail_worker(worker)
else
worker.send(delivery)
end
end
end
def fail_worker(worker)
@subscribed_workers.delete(worker)
transition_state(worker.domain, :failed)
worker.stop
end
@deliver_counter = 0
private def new_counter : Int32
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment