Rewrite part 2: add subscription manager to replace sidekiq

parent f3bd3aee
class PubRelay::Activity
include JSON::Serializable
getter id : String?
getter id : String
getter object : String | Object
@[JSON::Field(key: "type", converter: PubRelay::Activity::FuzzyStringArrayConverter)]
......@@ -52,7 +52,7 @@ class PubRelay::Activity
class Object
include JSON::Serializable
getter id : String?
getter id : String
@[JSON::Field(key: "type", converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter types : Array(String)
......
class PubRelay::DeliverWorker
include Sidekiq::Worker
sidekiq_options do |job|
job.retry = false
end
def perform(domain : String, request_body : String)
inbox_url = PubRelay.redis.hget("subscription:#{domain}", "inbox_url")
return unless inbox_url
inbox_url = URI.parse(inbox_url)
body_hash = OpenSSL::Digest.new("sha256")
body_hash.update(request_body)
body_hash = Base64.strict_encode(body_hash.digest)
headers = HTTP::Headers{
"Host" => inbox_url.host.not_nil!,
"Date" => HTTP.format_time(Time.utc_now),
"Digest" => "SHA-256=#{body_hash}",
}
signed_headers = "(request-target) host date digest"
signed_string = <<-END
(request-target): post #{inbox_url.path}
host: #{headers["Host"]}
date: #{headers["Date"]}
digest: #{headers["Digest"]}
END
signature = PubRelay.private_key.sign(OpenSSL::Digest.new("sha256"), signed_string)
headers["Signature"] = %(keyId="#{PubRelay.route_url("/actor")}",headers="#{signed_headers}",signature="#{Base64.strict_encode(signature)}")
client = HTTP::Client.new(inbox_url)
client.dns_timeout = 10.seconds
client.connect_timeout = 10.seconds
client.read_timeout = 10.seconds
response = client.post(inbox_url.full_path, headers: headers, body: request_body)
puts "POST #{inbox_url} #{response.status_code}"
end
end
......@@ -19,10 +19,14 @@ class PubRelay < Earl::Supervisor
stats = Stats.new
monitor(stats)
web_server = WebServer.new(domain, private_key, redis, bindhost, port, stats)
subscription_manager = SubscriptionManager.new(domain, private_key, redis, stats)
monitor(subscription_manager)
web_server = WebServer.new(domain, private_key, subscription_manager, bindhost, port, stats)
monitor(web_server)
end
end
require "./stats"
require "./subscription_manager"
require "./web_server"
......@@ -3,7 +3,15 @@ class PubRelay::Stats
response_code : String,
domain : String?
include Earl::Artist(HTTPResponsePayload)
record DeliveryPayload,
domain : String,
status_code : String,
counter : Int32
record DeliveryCounterPayload,
counter : Int32
include Earl::Artist(HTTPResponsePayload | DeliveryPayload | DeliveryCounterPayload)
@response_codes = Hash(String, Int32).new(default_value: 0)
@response_codes_per_domain = Hash(String, Hash(String, Int32)).new do |hash, key|
......@@ -15,10 +23,41 @@ class PubRelay::Stats
@response_codes_per_domain[response.domain || "NO DOMAIN"][response.response_code] += 1
end
@delivery_codes = Hash(String, Int32).new(default_value: 0)
@delivery_codes_per_domain = Hash(String, Hash(String, Int32)).new do |hash, key|
hash[key] = Hash(String, Int32).new(default_value: 0)
end
@latest_delivery = -1
@latest_delivery_per_domain = Hash(String, Int32).new(-1)
def call(delivery : DeliveryPayload)
@delivery_codes[delivery.status_code] += 1
@delivery_codes_per_domain[delivery.domain][delivery.status_code] += 1
prev_counter = @latest_delivery_per_domain[delivery.domain]
if delivery.counter > prev_counter
@latest_delivery_per_domain[delivery.domain] = delivery.counter
else
log.info "Message was delivered out of order for #{delivery.domain}"
end
end
def call(payload : DeliveryCounterPayload)
log.warn "Delivery counter went backwards!" unless payload.counter > @latest_delivery
@latest_delivery = payload.counter
end
def to_json(io)
{
response_codes: @response_codes,
response_codes_per_domain: @response_codes_per_domain,
delivery_codes: @delivery_codes,
delivery_codes_per_domain: @delivery_codes_per_domain,
deliver_count: @latest_delivery,
lag_per_domain: @latest_delivery_per_domain.transform_values { |count| @latest_delivery - count },
}.to_json(io)
end
end
class PubRelay::SubscriptionManager::DeliverWorker
record Delivery,
message : String,
counter : Int32,
accept : Bool
include Earl::Artist(Delivery)
getter domain : String
def initialize(
@domain : String,
@inbox_url : URI,
@relay_domain : String,
@private_key : OpenSSL::RSA,
@stats : Stats,
@subscription_manager : SubscriptionManager
)
end
getter(client : HTTP::Client) do
HTTP::Client.new(@inbox_url).tap do |client|
client.dns_timeout = 5.seconds
client.connect_timeout = 5.seconds
client.read_timeout = 10.seconds
end
end
def call(delivery : Delivery)
body_hash = OpenSSL::Digest.new("sha256")
body_hash.update(delivery.message)
body_hash = Base64.strict_encode(body_hash.digest)
headers = HTTP::Headers{
"Host" => @inbox_url.host.not_nil!,
"Date" => HTTP.format_time(Time.utc_now),
"Digest" => "SHA-256=#{body_hash}",
}
signed_headers = "(request-target) host date digest"
signed_string = <<-END
(request-target): post #{@inbox_url.path}
host: #{headers["Host"]}
date: #{headers["Date"]}
digest: #{headers["Digest"]}
END
signature = @private_key.sign(OpenSSL::Digest.new("sha256"), signed_string)
headers["Signature"] = %(keyId="https://#{@relay_domain}/actor",headers="#{signed_headers}",signature="#{Base64.strict_encode(signature)}")
start_time = Time.monotonic
begin
response = client.post(@inbox_url.full_path, headers: headers, body: delivery.message)
rescue
@client = nil
response = client.post(@inbox_url.full_path, headers: headers, body: delivery.message)
end
time = Time.monotonic - start_time
if response.success?
log.debug "POST #{@inbox_url} - #{response.status_code} (#{time.total_milliseconds}ms)"
else
log.info "POST #{@inbox_url} - #{response.status_code} (#{time.total_milliseconds}ms)"
end
@stats.send Stats::DeliveryPayload.new(@domain, response.status_code.to_s, delivery.counter)
if delivery.accept
@subscription_manager.send SubscriptionManager::AcceptSent.new(@domain)
end
rescue exception
exception_code = exception.try(&.inspect) || "Exited"
@stats.send Stats::DeliveryPayload.new(@domain, exception_code, delivery.counter)
raise exception
end
def reset
@client = nil
end
def terminate
@client.try(&.close)
end
end
class PubRelay::SubscriptionManager
record Subscription,
domain : String,
inbox_url : URI,
follow_id : String,
follow_actor_id : String
record AcceptSent,
domain : String
record Unsubscription,
domain : String
include Earl::Artist(Subscription | AcceptSent | Unsubscription)
enum State
Pending
Subscribed
Failed
Unsubscribed
def transition?(new_state)
case self
when Pending
new_state.subscribed?
when Subscribed
new_state.unsubscribed? || new_state.failed?
when Failed
new_state.subscribed? || new_state.unsubscribed?
when Unsubscribed
false
end
end
end
def initialize(
@relay_domain : String,
@private_key : OpenSSL::RSA,
@redis : Redis::PooledClient,
@stats : Stats
)
@workers = Set(DeliverWorker).new
@subscribed_workers = Set(DeliverWorker).new
@redis.keys(key_for("*")).each do |key|
key = key.as(String)
domain = key.lchop(key_for(""))
raise "BUG" if domain == key
inbox_url = @redis.hget(key, "inbox_url").not_nil!
inbox_url = URI.parse inbox_url
state = get_state(domain)
deliver_worker = DeliverWorker.new(
domain, inbox_url, @relay_domain, @private_key, @stats, self
)
@workers << deliver_worker
@subscribed_workers << deliver_worker if state.subscribed?
end
log.info "Found #{@workers.size} subscriptions, #{@subscribed_workers.size} of which are subscribed"
end
def call
@workers.each do |worker|
supervise worker
end
while message = receive?
call(message)
end
end
def call(subscription : Subscription)
deliver_worker = DeliverWorker.new(
subscription.domain, subscription.inbox_url, @relay_domain, @private_key, @stats, self
)
@redis.hmset(key_for(subscription.domain), {
inbox_url: subscription.inbox_url,
follow_id: subscription.follow_id,
follow_actor_id: subscription.follow_actor_id,
state: State::Pending.to_s,
})
supervise deliver_worker
accept_activity = {
"@context": "https://www.w3.org/ns/activitystreams",
id: route_url("/actor#accepts/follows/#{subscription.domain}"),
type: "Accept",
actor: route_url("/actor"),
object: {
id: subscription.follow_id,
type: "Follow",
actor: subscription.follow_actor_id,
object: route_url("/actor"),
},
}
counter = new_counter
delivery = DeliverWorker::Delivery.new(accept_activity.to_json, counter, accept: true)
deliver_worker.send delivery
end
def call(accept : AcceptSent)
state = get_state(accept.domain)
raise "#{accept.domain}'s state as #{state}, not pending" unless state.pending?
transition_state(accept.domain, :subscribed)
worker = @workers.find { |worker| worker.domain == accept.domain }
raise "Worker not found" unless worker
@subscribed_workers << worker
end
def call(unsubscribe : Unsubscription)
deliver_worker = @workers.find { |worker| worker.domain == unsubscribe.domain }
raise "Worker not found for unsubscribe" unless deliver_worker
@subscribed_workers.delete(deliver_worker)
transition_state(unsubscribe.domain, :unsubscribed)
deliver_worker.stop
@workers.delete(deliver_worker)
@redis.del(key_for(unsubscribe.domain))
end
def deliver(message : String, source_domain : String)
counter = new_counter
delivery = DeliverWorker::Delivery.new(message, counter, accept: false)
select_actions = @subscribed_workers.compact_map do |worker|
if worker.domain == source_domain
nil
else
worker.mailbox.send_select_action(delivery)
end
end
spawn do
until select_actions.empty?
index, _ = Channel.select(select_actions)
select_actions.delete_at(index)
end
end
end
@deliver_counter = 0
private def new_counter : Int32
counter = @deliver_counter += 1
@stats.send Stats::DeliveryCounterPayload.new(counter)
counter
end
private def transition_state(domain, new_state : State)
state = get_state(domain)
raise "Invalid transition for #{domain} (#{state} -> #{new_state})" unless state.transition? new_state
@redis.hset(key_for(domain), "state", new_state.to_s)
end
private def get_state(domain) : State
state = @redis.hget(key_for(domain), "state")
raise "BUG: subscription doesn't exist" unless state
State.parse(state)
end
private def key_for(domain)
"relay:subscription:#{domain}"
end
private def supervise(deliver_worker)
@workers << deliver_worker
spawn do
log.error "Supervision started when subscription manager not running!" unless self.running?
while self.running? && deliver_worker.starting?
deliver_worker.start(link: self)
end
end
end
@failure_counts = Hash(String, Int32).new(0)
def trap(agent, exception)
return log.error("Trapped agent was not a DeliverWorker!") unless agent.is_a? DeliverWorker
if exception
Earl::Logger.error(agent, exception) if exception
log.error { "#{agent.class.name} crashed (#{exception.class.name})" }
else
log.error { "#{agent.class.name} exited early" } if self.running?
end
@failure_counts[agent.domain] += 1
agent.recycle if self.running?
end
private def route_url(path)
"https://#{@relay_domain}#{path}"
end
def terminate
@workers.each do |agent|
agent.stop if agent.running?
end
end
def reset
@workers.each(&.recycle)
end
end
require "./deliver_worker"
......@@ -19,7 +19,7 @@ class PubRelay::WebServer
def initialize(
@domain : String,
@private_key : OpenSSL::RSA,
@redis : Redis::PooledClient,
@subscription_manager : SubscriptionManager,
@bindhost : String,
@port : Int32,
@stats : Stats
......@@ -136,7 +136,7 @@ class PubRelay::WebServer
end
private def handle_inbox(context)
InboxHandler.new(context, @domain, @redis).handle
InboxHandler.new(context, @domain, @subscription_manager).handle
end
private def route_url(path)
......
......@@ -5,7 +5,7 @@ class PubRelay::WebServer::InboxHandler
def initialize(
@context : HTTP::Server::Context,
@domain : String,
@redis : Redis::PooledClient
@subscription_manager : SubscriptionManager
)
end
......@@ -39,44 +39,27 @@ class PubRelay::WebServer::InboxHandler
error(400, "Follow only allowed for #{Activity::PUBLIC_COLLECTION}")
end
accept_activity = {
"@context": {"https://www.w3.org/ns/activitystreams"},
id: route_url("/actor#accepts/follows/#{actor.domain}"),
type: "Accept",
actor: route_url("/actor"),
object: {
id: activity.id,
type: "Follow",
actor: actor.id,
object: route_url("/actor"),
},
}
@redis.hset("subscription:#{actor.domain}", "inbox_url", actor.inbox_url)
# DeliverWorker.async.perform(actor.domain, accept_activity.to_json)
inbox_url = URI.parse(actor.inbox_url) rescue nil
error(400, "Inbox URL was not a valid URL") unless inbox_url
@subscription_manager.send(
SubscriptionManager::Subscription.new(
domain: actor.domain,
inbox_url: inbox_url,
follow_id: activity.id,
follow_actor_id: actor.id
)
)
end
def handle_unfollow(actor, activity)
@redis.del("subscription:#{actor.domain}")
@subscription_manager.send(
SubscriptionManager::Unsubscription.new(actor.domain)
)
end
def handle_forward(actor, request_body)
# TODO: cache the subscriptions
bulk_args = @redis.keys("subscription:*").compact_map do |key|
key = key.as(String)
domain = key.lchop("subscription:")
raise "redis bug" if domain == key
if domain == actor.domain
nil
else
{domain, request_body}
end
end
# DeliverWorker.async.perform_bulk(bulk_args)
@subscription_manager.deliver(request_body, source_domain: actor.domain)
end
private def error(status_code, error_code, user_message = "")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment