Fix SubscriptionManager crash recovery

parent 279d65a2
......@@ -12,6 +12,21 @@ SPEC_PKEY = OpenSSL::RSA.new(File.read(File.join(__DIR__, "test_actor.pem")))
Spec.before_each { SPEC_REDIS.flushdb }
class ErrorAgent
include Earl::Agent
getter exception : Exception?
def call
raise "Cannot start ErrorAgent"
end
def trap(agent, exception)
raise "Two exceptions logged!" unless @exception.nil?
@exception = exception
end
end
def request(method, resource, headers = nil, body = nil)
request = HTTP::Request.new(method, resource, headers, body)
......
require "../spec_helper"
def with_subscription_manager
subscription_manager = PubRelay::SubscriptionManager.new(
relay_domain: "example.com",
private_key: SPEC_PKEY,
redis: SPEC_REDIS,
stats: PubRelay::Stats.new
)
error_agent = ErrorAgent.new
subscription_manager.spawn(link: error_agent)
yield subscription_manager
sleep 10.milliseconds
raise error_agent.exception.not_nil! if subscription_manager.crashed?
subscription_manager.running?.should be_true
subscription_manager.stop
end
private alias SubMan = PubRelay::SubscriptionManager
describe PubRelay::SubscriptionManager do
it "recycles gracefully" do
with_subscription_manager do |manager|
# Send a subscription to spawn a worker
manager.send SubMan::Subscription.new(
domain: "example.com",
inbox_url: URI.parse("https://example.com/inbox"),
follow_id: "https://example.com/follow_id",
follow_actor_id: "https://example.com/follow_actor_id"
)
sleep 5.milliseconds
manager.@workers.size.should eq(1)
manager.running?.should be_true
# Send a bogus AcceptSent to crash the manager
manager.send SubMan::AcceptSent.new("non-existant.com")
sleep 5.milliseconds
manager.crashed?.should be_true
# Simulate the recycle -> restart cycle of a supervisor
manager.recycle
manager.starting?.should be_true
manager.spawn
end
end
end
......@@ -87,5 +87,7 @@ class PubRelay::SubscriptionManager::DeliverWorker
def terminate
@client.try(&.close)
ensure
@client = nil
end
end
......@@ -42,6 +42,10 @@ class PubRelay::SubscriptionManager
@workers = Set(DeliverWorker).new
@subscribed_workers = Set(DeliverWorker).new
load_subscriptions
end
private def load_subscriptions
@redis.keys(key_for("*")).each do |key|
key = key.as(String)
......@@ -215,7 +219,15 @@ class PubRelay::SubscriptionManager
end
def reset
@workers.each(&.recycle)
old_workers = @workers
@workers = Set(DeliverWorker).new
@subscribed_workers = Set(DeliverWorker).new
load_subscriptions
old_workers.each do |worker|
worker.stop if worker.running?
end
end
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment