Commit a7c154a5 authored by Chris Hobbs's avatar Chris Hobbs

Merge branch 'rewrite-1' into 'master'

Rewrite

See merge request !4
parents 5a136090 a550e7e8
Pipeline #332 failed with stage
in 24 seconds
require "redis"
old_redis = Redis.new(url: ENV["OLD_REDIS_URL"])
new_redis = Redis.new(url: ENV["NEW_REDIS_URL"])
old_redis.keys("subscription:*").each do |old_key|
inbox_url = old_redis.hget(old_key, "inbox_url").not_nil!
new_key = "relay:#{old_key}"
new_state = {
inbox_url: inbox_url,
follow_id: "",
follow_actor_id: "",
state: "Subscribed",
}
puts "HMSET #{new_key} #{new_state}"
new_redis.hmset(new_key, new_state)
end
version: 1.0
shards:
baked_file_system:
github: schovi/baked_file_system
commit: e1447549d5ac0560720fae62179b2f2c62c9bfd1
dotenv:
github: gdotdesign/cr-dotenv
version: 0.1.0
exception_page:
github: crystal-loot/exception_page
version: 0.1.1
kemal:
github: kemalcr/kemal
commit: 9448360eac4273919c2bb6d2517a7234844aec2f
kemal-csrf:
github: kemalcr/kemal-csrf
version: 0.5.0
kemal-session:
github: kemalcr/kemal-session
commit: bc720cca9c27e391b4b3fc32ae7cc23f4d3f3fdc
kilt:
github: jeromegn/kilt
version: 0.4.0
earl:
github: ysbaddaden/earl
commit: e809dff93f66793498c6140feef1b3e51b6f9ee4
openssl_ext:
github: randomstate/openssl_ext
......@@ -36,18 +16,10 @@ shards:
github: ysbaddaden/pool
version: 0.2.3
radix:
github: luislavena/radix
version: 0.3.8
redis:
github: stefanwille/crystal-redis
version: 2.0.0
sidekiq:
github: mperham/sidekiq.cr
commit: 002d984edc24d901771a60139e7da6dcfe109736
webmock:
github: manastech/webmock.cr
version: 0.10.0
......
......@@ -5,17 +5,18 @@ authors:
- RX14 <chris@rx14.co.uk>
dependencies:
earl:
github: ysbaddaden/earl
branch: master
openssl_ext:
github: randomstate/openssl_ext
version: '>= 0.1.1'
version: '~> 0.1.1'
redis:
github: stefanwille/crystal-redis
version: '>= 2.0.0'
sidekiq:
github: mperham/sidekiq.cr
branch: master
version: '~> 2.0.0'
dotenv:
github: gdotdesign/cr-dotenv
version: '~> 0.1.0'
development_dependencies:
webmock:
......@@ -23,10 +24,8 @@ development_dependencies:
version: '>= 0.10.0'
targets:
server:
main: src/server.cr
worker:
main: src/worker.cr
pub-relay:
main: src/entrypoint.cr
crystal: 0.26.0
......
......@@ -3,14 +3,29 @@ require "webmock"
require "../src/pub_relay"
Spec.before_each do
PubRelay.redis.flushall
end
Earl.application.spawn
Earl::Logger.level = Earl::Logger::Severity::ERROR
Earl::Logger.level = Earl::Logger::Severity::DEBUG if ENV["RELAY_DEBUG"]?
SPEC_REDIS = Redis::PooledClient.new(url: ENV["REDIS_URL"]? || "redis://localhost")
SPEC_PKEY = OpenSSL::RSA.new(File.read(File.join(__DIR__, "test_actor.pem")))
Spec.before_each { SPEC_REDIS.flushdb }
PubRelay.private_key = OpenSSL::RSA.new(File.read(File.join(__DIR__, "test_actor.pem")))
PubRelay.host = "example.com"
class ErrorAgent
include Earl::Agent
PubRelay.logger.level = Logger::WARN
getter exception : Exception?
def call
raise "Cannot start ErrorAgent"
end
def trap(agent, exception)
raise "Two exceptions logged!" unless @exception.nil?
@exception = exception
end
end
def request(method, resource, headers = nil, body = nil)
request = HTTP::Request.new(method, resource, headers, body)
......@@ -21,7 +36,22 @@ def request(method, resource, headers = nil, body = nil)
context = HTTP::Server::Context.new(request, response)
PubRelay.new.call(context)
stats = PubRelay::Stats.new
subscription_manager = PubRelay::SubscriptionManager.new(
relay_domain: "example.com",
private_key: SPEC_PKEY,
redis: SPEC_REDIS,
stats: stats
)
PubRelay::WebServer.new(
domain: "example.com",
private_key: SPEC_PKEY,
subscription_manager: subscription_manager,
bindhost: "localhost",
port: 0,
stats: stats
).call(context)
{response.status_code, response_body.to_s, response.headers}
end
require "../spec_helper"
def with_subscription_manager
subscription_manager = PubRelay::SubscriptionManager.new(
relay_domain: "example.com",
private_key: SPEC_PKEY,
redis: SPEC_REDIS,
stats: PubRelay::Stats.new
)
error_agent = ErrorAgent.new
subscription_manager.spawn(link: error_agent)
yield subscription_manager
sleep 10.milliseconds
raise error_agent.exception.not_nil! if subscription_manager.crashed?
subscription_manager.running?.should be_true
subscription_manager.stop
end
private alias SubMan = PubRelay::SubscriptionManager
describe PubRelay::SubscriptionManager do
it "handles unsubscribing non-existant subscriptions" do
with_subscription_manager do |manager|
manager.send SubMan::Unsubscription.new("non-existant.com")
end
end
it "recycles gracefully" do
with_subscription_manager do |manager|
# Send a subscription to spawn a worker
manager.send SubMan::Subscription.new(
domain: "example.com",
inbox_url: URI.parse("https://example.com/inbox"),
follow_id: "https://example.com/follow_id",
follow_actor_id: "https://example.com/follow_actor_id"
)
sleep 5.milliseconds
manager.@workers.size.should eq(1)
manager.running?.should be_true
# Send a bogus AcceptSent to crash the manager
manager.send SubMan::AcceptSent.new("non-existant.com")
sleep 5.milliseconds
manager.crashed?.should be_true
# Simulate the recycle -> restart cycle of a supervisor
manager.recycle
manager.starting?.should be_true
manager.spawn
end
end
end
require "./spec_helper"
require "../spec_helper"
private def post_inbox(headers, body = nil)
request("POST", "/inbox", headers, body)
......@@ -16,10 +16,12 @@ private def expect_signature_fails(signature_header, expected_body)
end
end
private alias HTTPSignature = PubRelay::WebServer::HTTPSignature
private def sir_boops_actor
InboxHandler::Actor.new(
HTTPSignature::Actor.new(
id: "https://mastodon.sergal.org/users/Sir_Boops",
public_key: InboxHandler::Key.new(
public_key: HTTPSignature::Key.new(
public_key_pem: <<-KEY,
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvwDujxmxoYHs64MyVB3L
......@@ -38,7 +40,7 @@ private def sir_boops_actor
)
end
describe InboxHandler do
describe PubRelay::WebServer::HTTPSignature do
it "fails unsigned requests" do
status_code, body = post_inbox(HTTP::Headers{"Signatur" => "typo"})
status_code.should eq(401)
......@@ -46,7 +48,7 @@ describe InboxHandler do
end
expect_signature_fails("", "did not contain '='")
expect_signature_fails("foo=bar, foo2", %q(param "foo2" did not contain '='))
expect_signature_fails("foo=bar, foo2", %q(param did not contain '=': "foo2"))
expect_signature_fails(%q(foo="), %q(malformed quoted-string))
expect_signature_fails(%q(foo="bar), %q(malformed quoted-string))
expect_signature_fails(%q(foo="bar\"), %q(malformed quoted-string))
......@@ -88,7 +90,7 @@ describe InboxHandler do
it "succeeds with empty endpoints object" do
File.open("spec/data/actor_empty_endpoints.json") do |file|
actor = Union(InboxHandler::Actor, InboxHandler::Key).from_json(file).as(InboxHandler::Actor)
actor = Union(HTTPSignature::Actor, HTTPSignature::Key).from_json(file).as(HTTPSignature::Actor)
actor.inbox_url.should eq("https://microblog.pub/inbox")
end
end
......@@ -205,7 +207,7 @@ describe InboxHandler do
status_code, body = post_inbox(signed_request.headers, signed_request.body)
status_code.should eq(400)
body.should contain(%q(Header "user-agent" was supposed to be signed but was missing from the request))
body.should contain(%q(Header was supposed to be signed but was missing from the request: "user-agent"))
end
end
end
......
require "./spec_helper"
describe PubRelay do
describe PubRelay::WebServer do
describe "webfinger" do
it "works" do
status_code, body, headers = request("GET", "/.well-known/webfinger?resource=acct%3Arelay%40example.com")
......
require "./converters"
class Activity
class PubRelay::Activity
include JSON::Serializable
getter id : String?
getter id : String
getter object : String | Object
@[JSON::Field(key: "type", converter: FuzzyStringArrayConverter)]
@[JSON::Field(key: "type", converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter types : Array(String)
@[JSON::Field(key: "signature", converter: PresenceConverter)]
@[JSON::Field(key: "signature", converter: PubRelay::Activity::PresenceConverter)]
getter? signature_present = false
@[JSON::Field(converter: FuzzyStringArrayConverter)]
@[JSON::Field(converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter to = [] of String
@[JSON::Field(converter: FuzzyStringArrayConverter)]
@[JSON::Field(converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter cc = [] of String
def follow?
......@@ -54,9 +52,38 @@ class Activity
class Object
include JSON::Serializable
getter id : String?
getter id : String
@[JSON::Field(key: "type", converter: FuzzyStringArrayConverter)]
@[JSON::Field(key: "type", converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter types : Array(String)
end
module PresenceConverter
def self.from_json(pull) : Bool
present = pull.kind != :null
pull.skip
present
end
end
module FuzzyStringArrayConverter
def self.from_json(pull) : Array(String)
strings = Array(String).new
case pull.kind
when :begin_array
pull.read_array do
if string = pull.read? String
strings << string
else
pull.skip
end
end
else
strings << pull.read_string
end
strings
end
end
end
module PresenceConverter
def self.from_json(pull) : Bool
present = pull.kind != :null
pull.skip
present
end
end
module FuzzyStringArrayConverter
def self.from_json(pull) : Array(String)
strings = Array(String).new
case pull.kind
when :begin_array
pull.read_array do
if string = pull.read? String
strings << string
else
pull.skip
end
end
else
strings << pull.read_string
end
strings
end
end
class DeliverWorker
include Sidekiq::Worker
sidekiq_options do |job|
job.retry = false
end
def perform(domain : String, request_body : String)
inbox_url = PubRelay.redis.hget("subscription:#{domain}", "inbox_url")
return unless inbox_url
inbox_url = URI.parse(inbox_url)
body_hash = OpenSSL::Digest.new("sha256")
body_hash.update(request_body)
body_hash = Base64.strict_encode(body_hash.digest)
headers = HTTP::Headers{
"Host" => inbox_url.host.not_nil!,
"Date" => HTTP.format_time(Time.utc_now),
"Digest" => "SHA-256=#{body_hash}",
}
signed_headers = "(request-target) host date digest"
signed_string = <<-END
(request-target): post #{inbox_url.path}
host: #{headers["Host"]}
date: #{headers["Date"]}
digest: #{headers["Digest"]}
END
signature = PubRelay.private_key.sign(OpenSSL::Digest.new("sha256"), signed_string)
headers["Signature"] = %(keyId="#{PubRelay.route_url("/actor")}",headers="#{signed_headers}",signature="#{Base64.strict_encode(signature)}")
client = HTTP::Client.new(inbox_url)
client.dns_timeout = 10.seconds
client.connect_timeout = 10.seconds
client.read_timeout = 10.seconds
response = client.post(inbox_url.full_path, headers: headers, body: request_body)
puts "POST #{inbox_url} #{response.status_code}"
end
end
require "dotenv"
Dotenv.load
require "./pub_relay"
raise "REDIS_URL must start with redis:// or rediss://" unless ENV["REDIS_URL"].starts_with? %r(redis(s?)://)
domain = ENV["RELAY_DOMAIN"]
redis = Redis::PooledClient.new(url: ENV["REDIS_URL"])
bindhost = ENV["RELAY_HOST"]? || "localhost"
port = (ENV["RELAY_PORT"]? || 8085).to_i
private_key_path = ENV["RELAY_PKEY_PATH"]
private_key = OpenSSL::RSA.new(File.read(private_key_path))
Earl::Logger.level = Earl::Logger::Severity::DEBUG if ENV["RELAY_DEBUG"]?
relay = PubRelay.new(domain, private_key, redis, bindhost, port)
Earl.application.monitor(relay)
Earl.application.start
......@@ -2,98 +2,31 @@ require "http"
require "json"
require "openssl_ext"
require "redis"
require "sidekiq"
require "earl"
require "./inbox_handler"
class PubRelay
class PubRelay < Earl::Supervisor
VERSION = "0.1.0"
include HTTP::Handler
# Make sidekiq use REDIS_URL
ENV["REDIS_URL"] ||= "redis://localhost:6379"
ENV["REDIS_PROVIDER"] = "REDIS_URL"
Sidekiq::Client.default_context = Sidekiq::Client::Context.new
class_getter redis = Redis::PooledClient.new(url: ENV["REDIS_URL"])
class_property(private_key) do
private_key_path = ENV["RELAY_PKEY_PATH"]? || File.join(Dir.current, "actor.pem")
OpenSSL::RSA.new(File.read(private_key_path))
end
class_property(host) { ENV["RELAY_DOMAIN"] }
class_property logger = Logger.new(STDOUT)
def call(context : HTTP::Server::Context)
case {context.request.method, context.request.path}
when {"GET", "/.well-known/webfinger"}
serve_webfinger(context)
when {"GET", "/actor"}
serve_actor(context)
when {"POST", "/inbox"}
handle_inbox(context)
else
call_next(context)
end
end
private def serve_webfinger(ctx)
resource = ctx.request.query_params["resource"]?
return error(ctx, 400, "Resource query parameter not present") unless resource
return error(ctx, 404, "Resource not found") unless resource == account_uri
ctx.response.content_type = "application/json"
{
subject: account_uri,
links: {
{
rel: "self",
type: "application/activity+json",
href: route_url("/actor"),
},
},
}.to_json(ctx.response)
end
def initialize(
domain : String,
private_key : OpenSSL::RSA,
redis : Redis::PooledClient,
bindhost : String,
port : Int32
)
super()
private def serve_actor(ctx)
ctx.response.content_type = "application/activity+json"
{
"@context": {"https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1"},
stats = Stats.new
monitor(stats)
id: route_url("/actor"),
type: "Service",
preferredUsername: "relay",
inbox: route_url("/inbox"),
subscription_manager = SubscriptionManager.new(domain, private_key, redis, stats)
monitor(subscription_manager)
publicKey: {
id: route_url("/actor#main-key"),
owner: route_url("/actor"),
publicKeyPem: PubRelay.private_key.public_key.to_pem,
},
}.to_json(ctx.response)
end
private def handle_inbox(context)
InboxHandler.new(context).handle
end
def account_uri
"acct:relay@#{PubRelay.host}"
end
def self.route_url(path)
"https://#{host}#{path}"
end
def route_url(path)
PubRelay.route_url(path)
end
private def error(context, status_code, message)
context.response.status_code = status_code
context.response.puts message
web_server = WebServer.new(domain, private_key, subscription_manager, bindhost, port, stats)
monitor(web_server)
end
end
require "./stats"
require "./subscription_manager"
require "./web_server"
require "dotenv"
Dotenv.load
require "./pub_relay"
handlers = [] of HTTP::Handler
handlers << HTTP::LogHandler.new if ENV["RELAY_DEBUG"]?
handlers << PubRelay.new
server = HTTP::Server.new(handlers)
bind_ip = server.bind_tcp(
host: ENV["RELAY_HOST"]? || "localhost",
port: (ENV["RELAY_PORT"]? || 8085).to_i,
reuse_port: !!ENV["RELAY_REUSEPORT"]?
)
puts "Listening on #{bind_ip}"
server.listen
class PubRelay::Stats
record HTTPResponsePayload,
response_code : String,
domain : String?
record DeliveryPayload,
domain : String,
status_code : String,
counter : Int32
record DeliveryCounterPayload,
counter : Int32
include Earl::Artist(HTTPResponsePayload | DeliveryPayload | DeliveryCounterPayload)
@response_codes = Hash(String, Int32).new(default_value: 0)
@response_codes_per_domain = Hash(String, Hash(String, Int32)).new do |hash, key|
hash[key] = Hash(String, Int32).new(default_value: 0)
end
def call(response : HTTPResponsePayload)
@response_codes[response.response_code] += 1
@response_codes_per_domain[response.domain || "NO DOMAIN"][response.response_code] += 1
end
@delivery_codes = Hash(String, Int32).new(default_value: 0)
@delivery_codes_per_domain = Hash(String, Hash(String, Int32)).new do |hash, key|
hash[key] = Hash(String, Int32).new(default_value: 0)
end
@latest_delivery = -1
@latest_delivery_per_domain = Hash(String, Int32).new(-1)
def call(delivery : DeliveryPayload)
@delivery_codes[delivery.status_code] += 1
@delivery_codes_per_domain[delivery.domain][delivery.status_code] += 1
prev_counter = @latest_delivery_per_domain[delivery.domain]
if delivery.counter > prev_counter
@latest_delivery_per_domain[delivery.domain] = delivery.counter
else
log.info "Message was delivered out of order for #{delivery.domain}"
end
end
def call(payload : DeliveryCounterPayload)
log.warn "Delivery counter went backwards!" unless payload.counter > @latest_delivery
@latest_delivery = payload.counter
end
def to_json(io)
{
response_codes: @response_codes,
response_codes_per_domain: @response_codes_per_domain,
delivery_codes: @delivery_codes,
delivery_codes_per_domain: @delivery_codes_per_domain,
deliver_count: @latest_delivery,
lag_per_domain: @latest_delivery_per_domain.transform_values { |count| @latest_delivery - count },
}.to_json(io)
end
end
class PubRelay::SubscriptionManager::DeliverWorker
record Delivery,
message : String,
domain : String,
counter : Int32,
accept : Bool
include Earl::Artist(Delivery)
getter domain : String
def initialize(
@domain : String,
@inbox_url : URI,
@relay_domain : String,
@private_key : OpenSSL::RSA,
@stats : Stats,
@subscription_manager : SubscriptionManager
)
end
getter(client : HTTP::Client) do
HTTP::Client.new(@inbox_url).tap do |client|
client.dns_timeout = 5.seconds
client.connect_timeout = 5.seconds
client.read_timeout = 10.seconds
end
end
def call(delivery : Delivery)
if delivery.domain == @domain
@stats.send Stats::DeliveryPayload.new(@domain, "SELF DOMAIN", delivery.counter)
return
end
body_hash = OpenSSL::Digest.new("sha256")
body_hash.update(delivery.message)
body_hash = Base64.strict_encode(body_hash.digest)
headers = HTTP::Headers{
"Host" => @inbox_url.host.not_nil!,
"Date" => HTTP.format_time(Time.utc_now),
"Digest" => "SHA-256=#{body_hash}",
}
signed_headers = "(request-target) host date digest"
signed_string = <<-END
(request-target): post #{@inbox_url.path}
host: #{headers["Host"]}
date: #{headers["Date"]}
digest: #{headers["Digest"]}
END
signature = @private_key.sign(OpenSSL::Digest.new("sha256"), signed_string)
headers["Signature"] = %(keyId="https://#{@relay_domain}/actor",headers="#{signed_headers}",signature="#{Base64.strict_encode(signature)}")
start_time = Time.monotonic
begin
response = client.post(@inbox_url.full_path, headers: headers, body: delivery.message)
rescue
@client = nil
response = client.post(@inbox_url.full_path, headers: headers, body: delivery.message)
end
time = Time.monotonic - start_time
if response.success?
log.debug "POST #{@inbox_url} - #{response.status_code} (#{time.total_milliseconds}ms)"
else
log.info "POST #{@inbox_url} - #{response.status_code} (#{time.total_milliseconds}ms)"