Rewrite part 1: remove sidekiq.cr and restructure around Earl

This code does not deliver messages, although it compiles and runs the web
server. Do not use!
parent 5a136090
version: 1.0
shards:
baked_file_system:
github: schovi/baked_file_system
commit: e1447549d5ac0560720fae62179b2f2c62c9bfd1
dotenv:
github: gdotdesign/cr-dotenv
version: 0.1.0
exception_page:
github: crystal-loot/exception_page
version: 0.1.1
kemal:
github: kemalcr/kemal
commit: 9448360eac4273919c2bb6d2517a7234844aec2f
kemal-csrf:
github: kemalcr/kemal-csrf
version: 0.5.0
kemal-session:
github: kemalcr/kemal-session
commit: bc720cca9c27e391b4b3fc32ae7cc23f4d3f3fdc
kilt:
github: jeromegn/kilt
version: 0.4.0
earl:
github: ysbaddaden/earl
commit: e809dff93f66793498c6140feef1b3e51b6f9ee4
openssl_ext:
github: randomstate/openssl_ext
......@@ -36,18 +16,10 @@ shards:
github: ysbaddaden/pool
version: 0.2.3
radix:
github: luislavena/radix
version: 0.3.8
redis:
github: stefanwille/crystal-redis
version: 2.0.0
sidekiq:
github: mperham/sidekiq.cr
commit: 002d984edc24d901771a60139e7da6dcfe109736
webmock:
github: manastech/webmock.cr
version: 0.10.0
......
......@@ -5,17 +5,18 @@ authors:
- RX14 <chris@rx14.co.uk>
dependencies:
earl:
github: ysbaddaden/earl
branch: master
openssl_ext:
github: randomstate/openssl_ext
version: '>= 0.1.1'
version: '~> 0.1.1'
redis:
github: stefanwille/crystal-redis
version: '>= 2.0.0'
sidekiq:
github: mperham/sidekiq.cr
branch: master
version: '~> 2.0.0'
dotenv:
github: gdotdesign/cr-dotenv
version: '~> 0.1.0'
development_dependencies:
webmock:
......@@ -23,10 +24,8 @@ development_dependencies:
version: '>= 0.10.0'
targets:
server:
main: src/server.cr
worker:
main: src/worker.cr
pub-relay:
main: src/entrypoint.cr
crystal: 0.26.0
......
......@@ -3,14 +3,9 @@ require "webmock"
require "../src/pub_relay"
Spec.before_each do
PubRelay.redis.flushall
end
PubRelay.private_key = OpenSSL::RSA.new(File.read(File.join(__DIR__, "test_actor.pem")))
PubRelay.host = "example.com"
PubRelay.logger.level = Logger::WARN
Earl.application.spawn
Earl::Logger.level = Earl::Logger::Severity::ERROR
Earl::Logger.level = Earl::Logger::Severity::DEBUG if ENV["RELAY_DEBUG"]?
def request(method, resource, headers = nil, body = nil)
request = HTTP::Request.new(method, resource, headers, body)
......@@ -21,7 +16,14 @@ def request(method, resource, headers = nil, body = nil)
context = HTTP::Server::Context.new(request, response)
PubRelay.new.call(context)
private_key = OpenSSL::RSA.new(File.read(File.join(__DIR__, "test_actor.pem")))
PubRelay::WebServer.new(
domain: "example.com",
private_key: private_key,
redis: Redis::PooledClient.new(url: ENV["REDIS_URL"]? || "redis://localhost"),
bindhost: "localhost",
port: 0
).call(context)
{response.status_code, response_body.to_s, response.headers}
end
require "./spec_helper"
require "../spec_helper"
private def post_inbox(headers, body = nil)
request("POST", "/inbox", headers, body)
......@@ -16,10 +16,12 @@ private def expect_signature_fails(signature_header, expected_body)
end
end
private alias HTTPSignature = PubRelay::WebServer::HTTPSignature
private def sir_boops_actor
InboxHandler::Actor.new(
HTTPSignature::Actor.new(
id: "https://mastodon.sergal.org/users/Sir_Boops",
public_key: InboxHandler::Key.new(
public_key: HTTPSignature::Key.new(
public_key_pem: <<-KEY,
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvwDujxmxoYHs64MyVB3L
......@@ -38,7 +40,7 @@ private def sir_boops_actor
)
end
describe InboxHandler do
describe PubRelay::WebServer::HTTPSignature do
it "fails unsigned requests" do
status_code, body = post_inbox(HTTP::Headers{"Signatur" => "typo"})
status_code.should eq(401)
......@@ -88,7 +90,7 @@ describe InboxHandler do
it "succeeds with empty endpoints object" do
File.open("spec/data/actor_empty_endpoints.json") do |file|
actor = Union(InboxHandler::Actor, InboxHandler::Key).from_json(file).as(InboxHandler::Actor)
actor = Union(HTTPSignature::Actor, HTTPSignature::Key).from_json(file).as(HTTPSignature::Actor)
actor.inbox_url.should eq("https://microblog.pub/inbox")
end
end
......
require "./spec_helper"
describe PubRelay do
describe PubRelay::WebServer do
describe "webfinger" do
it "works" do
status_code, body, headers = request("GET", "/.well-known/webfinger?resource=acct%3Arelay%40example.com")
......
require "./converters"
class Activity
class PubRelay::Activity
include JSON::Serializable
getter id : String?
getter object : String | Object
@[JSON::Field(key: "type", converter: FuzzyStringArrayConverter)]
@[JSON::Field(key: "type", converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter types : Array(String)
@[JSON::Field(key: "signature", converter: PresenceConverter)]
@[JSON::Field(key: "signature", converter: PubRelay::Activity::PresenceConverter)]
getter? signature_present = false
@[JSON::Field(converter: FuzzyStringArrayConverter)]
@[JSON::Field(converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter to = [] of String
@[JSON::Field(converter: FuzzyStringArrayConverter)]
@[JSON::Field(converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter cc = [] of String
def follow?
......@@ -56,7 +54,36 @@ class Activity
getter id : String?
@[JSON::Field(key: "type", converter: FuzzyStringArrayConverter)]
@[JSON::Field(key: "type", converter: PubRelay::Activity::FuzzyStringArrayConverter)]
getter types : Array(String)
end
module PresenceConverter
def self.from_json(pull) : Bool
present = pull.kind != :null
pull.skip
present
end
end
module FuzzyStringArrayConverter
def self.from_json(pull) : Array(String)
strings = Array(String).new
case pull.kind
when :begin_array
pull.read_array do
if string = pull.read? String
strings << string
else
pull.skip
end
end
else
strings << pull.read_string
end
strings
end
end
end
module PresenceConverter
def self.from_json(pull) : Bool
present = pull.kind != :null
pull.skip
present
end
end
module FuzzyStringArrayConverter
def self.from_json(pull) : Array(String)
strings = Array(String).new
case pull.kind
when :begin_array
pull.read_array do
if string = pull.read? String
strings << string
else
pull.skip
end
end
else
strings << pull.read_string
end
strings
end
end
class DeliverWorker
class PubRelay::DeliverWorker
include Sidekiq::Worker
sidekiq_options do |job|
......
require "dotenv"
Dotenv.load
require "./pub_relay"
raise "REDIS_URL must start with redis:// or rediss://" unless ENV["REDIS_URL"].starts_with? %r(redis(s?)://)
domain = ENV["RELAY_DOMAIN"]
redis = Redis::PooledClient.new(url: ENV["REDIS_URL"])
bindhost = ENV["RELAY_HOST"]? || "localhost"
port = (ENV["RELAY_PORT"]? || 8085).to_i
private_key_path = ENV["RELAY_PKEY_PATH"]
private_key = OpenSSL::RSA.new(File.read(private_key_path))
Earl::Logger.level = Earl::Logger::Severity::DEBUG if ENV["RELAY_DEBUG"]?
relay = PubRelay.new(domain, private_key, redis, bindhost, port)
Earl.application.monitor(relay)
Earl.application.start
......@@ -2,98 +2,23 @@ require "http"
require "json"
require "openssl_ext"
require "redis"
require "sidekiq"
require "earl"
require "./inbox_handler"
class PubRelay
class PubRelay < Earl::Supervisor
VERSION = "0.1.0"
include HTTP::Handler
# Make sidekiq use REDIS_URL
ENV["REDIS_URL"] ||= "redis://localhost:6379"
ENV["REDIS_PROVIDER"] = "REDIS_URL"
Sidekiq::Client.default_context = Sidekiq::Client::Context.new
class_getter redis = Redis::PooledClient.new(url: ENV["REDIS_URL"])
class_property(private_key) do
private_key_path = ENV["RELAY_PKEY_PATH"]? || File.join(Dir.current, "actor.pem")
OpenSSL::RSA.new(File.read(private_key_path))
end
class_property(host) { ENV["RELAY_DOMAIN"] }
class_property logger = Logger.new(STDOUT)
def initialize(
@domain : String,
@private_key : OpenSSL::RSA,
@redis : Redis::PooledClient,
@bindhost : String,
@port : Int32
)
super()
def call(context : HTTP::Server::Context)
case {context.request.method, context.request.path}
when {"GET", "/.well-known/webfinger"}
serve_webfinger(context)
when {"GET", "/actor"}
serve_actor(context)
when {"POST", "/inbox"}
handle_inbox(context)
else
call_next(context)
end
end
private def serve_webfinger(ctx)
resource = ctx.request.query_params["resource"]?
return error(ctx, 400, "Resource query parameter not present") unless resource
return error(ctx, 404, "Resource not found") unless resource == account_uri
ctx.response.content_type = "application/json"
{
subject: account_uri,
links: {
{
rel: "self",
type: "application/activity+json",
href: route_url("/actor"),
},
},
}.to_json(ctx.response)
end
private def serve_actor(ctx)
ctx.response.content_type = "application/activity+json"
{
"@context": {"https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1"},
id: route_url("/actor"),
type: "Service",
preferredUsername: "relay",
inbox: route_url("/inbox"),
publicKey: {
id: route_url("/actor#main-key"),
owner: route_url("/actor"),
publicKeyPem: PubRelay.private_key.public_key.to_pem,
},
}.to_json(ctx.response)
end
private def handle_inbox(context)
InboxHandler.new(context).handle
end
def account_uri
"acct:relay@#{PubRelay.host}"
end
def self.route_url(path)
"https://#{host}#{path}"
end
def route_url(path)
PubRelay.route_url(path)
end
private def error(context, status_code, message)
context.response.status_code = status_code
context.response.puts message
web_server = PubRelay::WebServer.new(@domain, @private_key, @redis, @bindhost, @port)
monitor(web_server)
end
end
require "./web_server"
require "dotenv"
Dotenv.load
require "./pub_relay"
handlers = [] of HTTP::Handler
handlers << HTTP::LogHandler.new if ENV["RELAY_DEBUG"]?
handlers << PubRelay.new
server = HTTP::Server.new(handlers)
bind_ip = server.bind_tcp(
host: ENV["RELAY_HOST"]? || "localhost",
port: (ENV["RELAY_PORT"]? || 8085).to_i,
reuse_port: !!ENV["RELAY_REUSEPORT"]?
)
puts "Listening on #{bind_ip}"
server.listen
class PubRelay::WebServer
include HTTP::Handler
include Earl::Agent
include Earl::Logger
class ClientError < Exception
getter status_code : Int32
def initialize(@status_code, message)
super(message)
end
end
def initialize(
@domain : String,
@private_key : OpenSSL::RSA,
@redis : Redis::PooledClient,
@bindhost : String,
@port : Int32
)
end
@server : HTTP::Server?
def call
@server = server = HTTP::Server.new(self)
bind_ip = server.bind_tcp(@bindhost, @port)
log.info("Listening on #{bind_ip}")
server.listen
end
def terminate
@server.try(&.close)
end
def reset
@server = nil
end
def call(context : HTTP::Server::Context)
start_time = Time.monotonic
exception = nil
begin
case {context.request.method, context.request.path}
when {"GET", "/.well-known/webfinger"}
serve_webfinger(context)
when {"GET", "/actor"}
serve_actor(context)
when {"POST", "/inbox"}
handle_inbox(context)
else
call_next(context)
end
rescue exception : ClientError
begin
context.response.status_code = exception.status_code
context.response.print exception.message
rescue ignored
end
rescue exception
begin
context.response.status_code = 500
context.response.print "Internal Server Error!"
rescue ignored
end
end
time = Time.monotonic - start_time
log_message = "#{context.request.method} #{context.request.resource} - #{context.response.status_code} (#{time.total_milliseconds.round(2)}ms)"
case exception
when ClientError
log.warn "#{log_message} #{exception.message}"
when Exception
log.error log_message
log.error exception
else
log.debug log_message
end
end
private def serve_webfinger(ctx)
account_uri = "acct:relay@#{@domain}"
resource = ctx.request.query_params["resource"]?
error(400, "Resource query parameter not present") unless resource
error(404, "Resource not found") unless resource == account_uri
ctx.response.content_type = "application/json"
{
subject: account_uri,
links: {
{
rel: "self",
type: "application/activity+json",
href: route_url("/actor"),
},
},
}.to_json(ctx.response)
end
private def serve_actor(ctx)
ctx.response.content_type = "application/activity+json"
{
"@context": {"https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1"},
id: route_url("/actor"),
type: "Service",
preferredUsername: "relay",
inbox: route_url("/inbox"),
publicKey: {
id: route_url("/actor#main-key"),
owner: route_url("/actor"),
publicKeyPem: @private_key.public_key.to_pem,
},
}.to_json(ctx.response)
end
private def handle_inbox(context)
InboxHandler.new(context, @domain, @redis).handle
end
private def route_url(path)
"https://#{@domain}#{path}"
end
private def error(status_code, message)
raise WebServer::ClientError.new(status_code, message)
end
end
require "./web_server/inbox_handler"
require "./activity"
require "./deliver_worker"
class InboxHandler
class Error < Exception
end
struct PubRelay::WebServer::HTTPSignature
def initialize(@context : HTTP::Server::Context)
end
def handle
request_body, actor_from_signature = verify_signature
# TODO: handle blocks
begin
activity = Activity.from_json(request_body)
rescue ex : JSON::Error
error(400, "Invalid activity JSON\n#{ex.inspect_with_backtrace}")
end
case activity
when .follow?
handle_follow(actor_from_signature, activity)
when .unfollow?
handle_unfollow(actor_from_signature, activity)
when .valid_for_rebroadcast?
handle_forward(actor_from_signature, request_body)
end
response.status_code = 202
response.puts "OK"
rescue ignored : InboxHandler::Error
# error output was already set
end
def handle_follow(actor, activity)
unless activity.object_is_public_collection?
error(400, "Follow only allowed for #{Activity::PUBLIC_COLLECTION}")
end
accept_activity = {
"@context": {"https://www.w3.org/ns/activitystreams"},
id: PubRelay.route_url("/actor#accepts/follows/#{actor.domain}"),
type: "Accept",
actor: PubRelay.route_url("/actor"),
object: {
id: activity.id,
type: "Follow",
actor: actor.id,
object: PubRelay.route_url("/actor"),
},
}
PubRelay.redis.hset("subscription:#{actor.domain}", "inbox_url", actor.inbox_url)
DeliverWorker.async.perform(actor.domain, accept_activity.to_json)
end
def handle_unfollow(actor, activity)
PubRelay.redis.del("subscription:#{actor.domain}")
end
def handle_forward(actor, request_body)
# TODO: cache the subscriptions
bulk_args = PubRelay.redis.keys("subscription:*").compact_map do |key|
key = key.as(String)
domain = key.lchop("subscription:")
raise "redis bug" if domain == key
if domain == actor.domain
nil
else
{domain, request_body}
end
end
DeliverWorker.async.perform_bulk(bulk_args)
end
# Verify HTTP signatures according to https://tools.ietf.org/html/draft-cavage-http-signatures-06.
# In this specific implementation keyId is the URL to either an ActivityPub actor or
# a [Web Payments Key](https://web-payments.org/vocabs/security#Key).
private def verify_signature : {String, Actor}
signature_header = request.headers["Signature"]?
error(401, "Request not signed: no Signature header") unless signature_header
signature_params = parse_signature(signature_header)
def verify_signature : {String, Actor}
signature_params = parse_signature
key_id = signature_params["keyId"]?
error(400, "Invalid Signature: keyId not present") unless key_id
......@@ -101,8 +21,8 @@ class InboxHandler
error(400, "No request body") unless body = request.body
body = String.build do |io|
copy_size = IO.copy(body, io, 4_096_000)
error(400, "Request body too large") if copy_size == 4_096_000
copy_size = IO.copy(body, io, 256_000)
error(400, "Request body too large") if copy_size == 256_000
end
signed_string = build_signed_string(body, signature_params["headers"]?)
......@@ -122,10 +42,12 @@ class InboxHandler
end
end
private def parse_signature(signature) : Hash(String, String)
params = Hash(String, String).new
private def parse_signature : Hash(String, String)
signature_header = request.headers["Signature"]?
error(401, "Request not signed: no Signature header") unless signature_header
signature.split(',') do |param|
params = Hash(String, String).new
signature_header.split(',') do |param|
parts = param.split('=', 2)
unless parts.size == 2
error(400, "Invalid Signature: param #{param.strip.inspect} did not contain '='")
......@@ -152,17 +74,6 @@ class InboxHandler
params
end
private def cached_fetch_json(url, json_class : JsonType.class) : JsonType forall JsonType
# TODO: actually cache this
headers = HTTP::Headers{"Accept" => "application/activity+json, application/ld+json"}
# TODO use HTTP::Client.new and set read timeout
response = HTTP::Client.get(url, headers: headers)
unless response.status_code == 200
error(400, "Got non-200 response from fetching #{url.inspect}")
end
JsonType.from_json(response.body)
end
private def actor_from_key_id(key_id) : Actor
# Signature keyId is actually the URL
case key = cached_fetch_json(key_id, Actor | Key)
......@@ -176,7 +87,18 @@ class InboxHandler
raise "BUG: cached_fetch_json returned neither Actor nor Key"
end
rescue ex : JSON::Error
error(400, "Invalid JSON from fetching #{key_id.inspect}\n#{ex.inspect_with_backtrace}")
error(400, "Invalid JSON from fetching #{key_id.inspect}: #{ex.inspect_with_backtrace}")
end
private def cached_fetch_json(url, json_class : JsonType.class) : JsonType forall JsonType
# TODO: actually cache this
headers = HTTP::Headers{"Accept" => "application/activity+json, application/ld+json"}
# TODO use HTTP::Client.new and set read timeout
response = HTTP::Client.get(url, headers: headers)
unless response.status_code == 200
error(400, "Got non-200 response from fetching #{url.inspect}")
end
JsonType.from_json(response.body)
end
private def build_signed_string(body, signed_headers)
......@@ -200,6 +122,14 @@ class InboxHandler
end
end
private def request
@context.request
end
private def error(status_code, message)
raise WebServer::ClientError.new(status_code, message)
end
class Actor
include JSON::Serializable
......@@ -238,21 +168,4 @@ class InboxHandler
@[JSON::Field(key: "sharedInbox")]
getter shared_inbox : String?
end