From 7f4213924f9fdb14bb36043d17615e838b85b733 Mon Sep 17 00:00:00 2001 From: Daniel Supernault Date: Tue, 13 Jul 2021 23:09:50 -0600 Subject: [PATCH] Update job queue, separate deletes into their own queue --- app/Http/Controllers/FederationController.php | 251 +++++++------- app/Jobs/InboxPipeline/DeleteWorker.php | 14 +- config/horizon.php | 327 +++++++++--------- 3 files changed, 309 insertions(+), 283 deletions(-) diff --git a/app/Http/Controllers/FederationController.php b/app/Http/Controllers/FederationController.php index c17019f1b..8c121a898 100644 --- a/app/Http/Controllers/FederationController.php +++ b/app/Http/Controllers/FederationController.php @@ -3,16 +3,17 @@ namespace App\Http\Controllers; use App\Jobs\InboxPipeline\{ - InboxWorker, - InboxValidator + DeleteWorker, + InboxWorker, + InboxValidator }; use App\Jobs\RemoteFollowPipeline\RemoteFollowPipeline; use App\{ - AccountLog, - Like, - Profile, - Status, - User + AccountLog, + Like, + Profile, + Status, + User }; use App\Util\Lexer\Nickname; use App\Util\Webfinger\Webfinger; @@ -23,146 +24,158 @@ use Illuminate\Http\Request; use League\Fractal; use App\Util\Site\Nodeinfo; use App\Util\ActivityPub\{ - Helpers, - HttpSignature, - Outbox + Helpers, + HttpSignature, + Outbox }; use Zttp\Zttp; class FederationController extends Controller { - public function nodeinfoWellKnown() - { - abort_if(!config('federation.nodeinfo.enabled'), 404); - return response()->json(Nodeinfo::wellKnown()) - ->header('Access-Control-Allow-Origin','*'); - } + public function nodeinfoWellKnown() + { + abort_if(!config('federation.nodeinfo.enabled'), 404); + return response()->json(Nodeinfo::wellKnown()) + ->header('Access-Control-Allow-Origin','*'); + } - public function nodeinfo() - { - abort_if(!config('federation.nodeinfo.enabled'), 404); - return response()->json(Nodeinfo::get()) - ->header('Access-Control-Allow-Origin','*'); - } + public function nodeinfo() + { + abort_if(!config('federation.nodeinfo.enabled'), 404); + return response()->json(Nodeinfo::get()) + ->header('Access-Control-Allow-Origin','*'); + } - public function webfinger(Request $request) - { - abort_if(!config('federation.webfinger.enabled'), 400); + public function webfinger(Request $request) + { + abort_if(!config('federation.webfinger.enabled'), 400); - abort_if(!$request->filled('resource'), 400); + abort_if(!$request->filled('resource'), 400); - $resource = $request->input('resource'); - $parsed = Nickname::normalizeProfileUrl($resource); - if(empty($parsed) || $parsed['domain'] !== config('pixelfed.domain.app')) { - abort(404); - } - $username = $parsed['username']; - $profile = Profile::whereNull('domain')->whereUsername($username)->firstOrFail(); - if($profile->status != null) { - return ProfileController::accountCheck($profile); - } - $webfinger = (new Webfinger($profile))->generate(); + $resource = $request->input('resource'); + $parsed = Nickname::normalizeProfileUrl($resource); + if(empty($parsed) || $parsed['domain'] !== config('pixelfed.domain.app')) { + abort(404); + } + $username = $parsed['username']; + $profile = Profile::whereNull('domain')->whereUsername($username)->firstOrFail(); + if($profile->status != null) { + return ProfileController::accountCheck($profile); + } + $webfinger = (new Webfinger($profile))->generate(); - return response()->json($webfinger, 200, [], JSON_PRETTY_PRINT|JSON_UNESCAPED_SLASHES) - ->header('Access-Control-Allow-Origin','*'); - } + return response()->json($webfinger, 200, [], JSON_PRETTY_PRINT|JSON_UNESCAPED_SLASHES) + ->header('Access-Control-Allow-Origin','*'); + } - public function hostMeta(Request $request) - { - abort_if(!config('federation.webfinger.enabled'), 404); + public function hostMeta(Request $request) + { + abort_if(!config('federation.webfinger.enabled'), 404); - $path = route('well-known.webfinger'); - $xml = ''; + $path = route('well-known.webfinger'); + $xml = ''; - return response($xml)->header('Content-Type', 'application/xrd+xml'); - } + return response($xml)->header('Content-Type', 'application/xrd+xml'); + } - public function userOutbox(Request $request, $username) - { - abort_if(!config_cache('federation.activitypub.enabled'), 404); - abort_if(!config('federation.activitypub.outbox'), 404); + public function userOutbox(Request $request, $username) + { + abort_if(!config_cache('federation.activitypub.enabled'), 404); + abort_if(!config('federation.activitypub.outbox'), 404); - $profile = Profile::whereNull('domain') - ->whereNull('status') - ->whereIsPrivate(false) - ->whereUsername($username) - ->firstOrFail(); + $profile = Profile::whereNull('domain') + ->whereNull('status') + ->whereIsPrivate(false) + ->whereUsername($username) + ->firstOrFail(); - $key = 'ap:outbox:latest_10:pid:' . $profile->id; - $ttl = now()->addMinutes(15); - $res = Cache::remember($key, $ttl, function() use($profile) { - return Outbox::get($profile); - }); + $key = 'ap:outbox:latest_10:pid:' . $profile->id; + $ttl = now()->addMinutes(15); + $res = Cache::remember($key, $ttl, function() use($profile) { + return Outbox::get($profile); + }); - return response(json_encode($res, JSON_UNESCAPED_SLASHES))->header('Content-Type', 'application/activity+json'); - } + return response(json_encode($res, JSON_UNESCAPED_SLASHES))->header('Content-Type', 'application/activity+json'); + } - public function userInbox(Request $request, $username) - { - abort_if(!config_cache('federation.activitypub.enabled'), 404); - abort_if(!config('federation.activitypub.inbox'), 404); + public function userInbox(Request $request, $username) + { + abort_if(!config_cache('federation.activitypub.enabled'), 404); + abort_if(!config('federation.activitypub.inbox'), 404); - $headers = $request->headers->all(); - $payload = $request->getContent(); - dispatch(new InboxValidator($username, $headers, $payload))->onQueue('high'); - return; - } + $headers = $request->headers->all(); + $payload = $request->getContent(); + $obj = json_decode($payload, true, 8); - public function sharedInbox(Request $request) - { - abort_if(!config_cache('federation.activitypub.enabled'), 404); - abort_if(!config('federation.activitypub.sharedInbox'), 404); + if(isset($obj['type']) && $obj['type'] === 'Delete') { + dispatch(new DeleteWorker($headers, $payload))->onQueue('delete'); + } else { + dispatch(new InboxValidator($username, $headers, $payload))->onQueue('high'); + } + return; + } - $headers = $request->headers->all(); - $payload = $request->getContent(); - dispatch(new InboxWorker($headers, $payload))->onQueue('high'); - return; - } + public function sharedInbox(Request $request) + { + abort_if(!config_cache('federation.activitypub.enabled'), 404); + abort_if(!config('federation.activitypub.sharedInbox'), 404); - public function userFollowing(Request $request, $username) - { - abort_if(!config_cache('federation.activitypub.enabled'), 404); + $headers = $request->headers->all(); + $payload = $request->getContent(); + $obj = json_decode($payload, true, 8); - $profile = Profile::whereNull('remote_url') - ->whereUsername($username) - ->whereIsPrivate(false) - ->firstOrFail(); + if(isset($obj['type']) && $obj['type'] === 'Delete') { + dispatch(new DeleteWorker($headers, $payload))->onQueue('delete'); + } else { + dispatch(new InboxWorker($headers, $payload))->onQueue('high'); + } + return; + } - if($profile->status != null) { - abort(404); - } + public function userFollowing(Request $request, $username) + { + abort_if(!config_cache('federation.activitypub.enabled'), 404); - $obj = [ - '@context' => 'https://www.w3.org/ns/activitystreams', - 'id' => $request->getUri(), - 'type' => 'OrderedCollectionPage', - 'totalItems' => 0, - 'orderedItems' => [] - ]; - return response()->json($obj); - } + $profile = Profile::whereNull('remote_url') + ->whereUsername($username) + ->whereIsPrivate(false) + ->firstOrFail(); - public function userFollowers(Request $request, $username) - { - abort_if(!config_cache('federation.activitypub.enabled'), 404); + if($profile->status != null) { + abort(404); + } - $profile = Profile::whereNull('remote_url') - ->whereUsername($username) - ->whereIsPrivate(false) - ->firstOrFail(); + $obj = [ + '@context' => 'https://www.w3.org/ns/activitystreams', + 'id' => $request->getUri(), + 'type' => 'OrderedCollectionPage', + 'totalItems' => 0, + 'orderedItems' => [] + ]; + return response()->json($obj); + } - if($profile->status != null) { - abort(404); - } + public function userFollowers(Request $request, $username) + { + abort_if(!config_cache('federation.activitypub.enabled'), 404); - $obj = [ - '@context' => 'https://www.w3.org/ns/activitystreams', - 'id' => $request->getUri(), - 'type' => 'OrderedCollectionPage', - 'totalItems' => 0, - 'orderedItems' => [] - ]; + $profile = Profile::whereNull('remote_url') + ->whereUsername($username) + ->whereIsPrivate(false) + ->firstOrFail(); - return response()->json($obj); - } + if($profile->status != null) { + abort(404); + } + + $obj = [ + '@context' => 'https://www.w3.org/ns/activitystreams', + 'id' => $request->getUri(), + 'type' => 'OrderedCollectionPage', + 'totalItems' => 0, + 'orderedItems' => [] + ]; + + return response()->json($obj); + } } diff --git a/app/Jobs/InboxPipeline/DeleteWorker.php b/app/Jobs/InboxPipeline/DeleteWorker.php index cab8df0c8..bed75f1df 100644 --- a/app/Jobs/InboxPipeline/DeleteWorker.php +++ b/app/Jobs/InboxPipeline/DeleteWorker.php @@ -50,7 +50,7 @@ class DeleteWorker implements ShouldQueue $payload = json_decode($this->payload, true, 8); if(isset($payload['id'])) { - $lockKey = hash('sha256', $payload['id']); + $lockKey = 'pf:ap:del-lock:' . hash('sha256', $payload['id']); if(Cache::get($lockKey) !== null) { // Job processed already return 1; @@ -116,6 +116,18 @@ class DeleteWorker implements ShouldQueue return; } + + $profile = null; + + if($this->verifySignature($headers, $payload) == true) { + (new Inbox($headers, $profile, $payload))->handle(); + return; + } else if($this->blindKeyRotation($headers, $payload) == true) { + (new Inbox($headers, $profile, $payload))->handle(); + return; + } else { + return; + } } protected function verifySignature($headers, $payload) diff --git a/config/horizon.php b/config/horizon.php index 62320ee8b..786eb6741 100644 --- a/config/horizon.php +++ b/config/horizon.php @@ -2,190 +2,191 @@ return [ - /* - |-------------------------------------------------------------------------- - | Horizon Domain - |-------------------------------------------------------------------------- - | - | This is the subdomain where Horizon will be accessible from. If this - | setting is null, Horizon will reside under the same domain as the - | application. Otherwise, this value will serve as the subdomain. - | - */ + /* + |-------------------------------------------------------------------------- + | Horizon Domain + |-------------------------------------------------------------------------- + | + | This is the subdomain where Horizon will be accessible from. If this + | setting is null, Horizon will reside under the same domain as the + | application. Otherwise, this value will serve as the subdomain. + | + */ - 'domain' => null, + 'domain' => null, - /* - |-------------------------------------------------------------------------- - | Horizon Path - |-------------------------------------------------------------------------- - | - | This is the URI path where Horizon will be accessible from. Feel free - | to change this path to anything you like. Note that the URI will not - | affect the paths of its internal API that aren't exposed to users. - | - */ + /* + |-------------------------------------------------------------------------- + | Horizon Path + |-------------------------------------------------------------------------- + | + | This is the URI path where Horizon will be accessible from. Feel free + | to change this path to anything you like. Note that the URI will not + | affect the paths of its internal API that aren't exposed to users. + | + */ - 'path' => 'horizon', + 'path' => 'horizon', - /* - |-------------------------------------------------------------------------- - | Horizon Redis Connection - |-------------------------------------------------------------------------- - | - | This is the name of the Redis connection where Horizon will store the - | meta information required for it to function. It includes the list - | of supervisors, failed jobs, job metrics, and other information. - | - */ + /* + |-------------------------------------------------------------------------- + | Horizon Redis Connection + |-------------------------------------------------------------------------- + | + | This is the name of the Redis connection where Horizon will store the + | meta information required for it to function. It includes the list + | of supervisors, failed jobs, job metrics, and other information. + | + */ - 'use' => 'default', + 'use' => 'default', - /* - |-------------------------------------------------------------------------- - | Horizon Redis Prefix - |-------------------------------------------------------------------------- - | - | This prefix will be used when storing all Horizon data in Redis. You - | may modify the prefix when you are running multiple installations - | of Horizon on the same server so that they don't have problems. - | - */ + /* + |-------------------------------------------------------------------------- + | Horizon Redis Prefix + |-------------------------------------------------------------------------- + | + | This prefix will be used when storing all Horizon data in Redis. You + | may modify the prefix when you are running multiple installations + | of Horizon on the same server so that they don't have problems. + | + */ - 'prefix' => env('HORIZON_PREFIX', 'horizon-'.str_random(8).':'), + 'prefix' => env('HORIZON_PREFIX', 'horizon-'.str_random(8).':'), - /* - |-------------------------------------------------------------------------- - | Horizon Route Middleware - |-------------------------------------------------------------------------- - | - | These middleware will get attached onto each Horizon route, giving you - | the chance to add your own middleware to this list or change any of - | the existing middleware. Or, you can simply stick with this list. - | - */ + /* + |-------------------------------------------------------------------------- + | Horizon Route Middleware + |-------------------------------------------------------------------------- + | + | These middleware will get attached onto each Horizon route, giving you + | the chance to add your own middleware to this list or change any of + | the existing middleware. Or, you can simply stick with this list. + | + */ - 'middleware' => ['web'], + 'middleware' => ['web'], - /* - |-------------------------------------------------------------------------- - | Queue Wait Time Thresholds - |-------------------------------------------------------------------------- - | - | This option allows you to configure when the LongWaitDetected event - | will be fired. Every connection / queue combination may have its - | own, unique threshold (in seconds) before this event is fired. - | - */ + /* + |-------------------------------------------------------------------------- + | Queue Wait Time Thresholds + |-------------------------------------------------------------------------- + | + | This option allows you to configure when the LongWaitDetected event + | will be fired. Every connection / queue combination may have its + | own, unique threshold (in seconds) before this event is fired. + | + */ - 'waits' => [ - 'redis:feed' => 30, - 'redis:default' => 30, - 'redis:high' => 30, - ], + 'waits' => [ + 'redis:feed' => 30, + 'redis:default' => 30, + 'redis:high' => 30, + 'redis:delete' => 30 + ], - /* - |-------------------------------------------------------------------------- - | Job Trimming Times - |-------------------------------------------------------------------------- - | - | Here you can configure for how long (in minutes) you desire Horizon to - | persist the recent and failed jobs. Typically, recent jobs are kept - | for one hour while all failed jobs are stored for an entire week. - | - */ + /* + |-------------------------------------------------------------------------- + | Job Trimming Times + |-------------------------------------------------------------------------- + | + | Here you can configure for how long (in minutes) you desire Horizon to + | persist the recent and failed jobs. Typically, recent jobs are kept + | for one hour while all failed jobs are stored for an entire week. + | + */ - 'trim' => [ - 'recent' => 60, - 'pending' => 60, - 'completed' => 60, - 'recent_failed' => 10080, - 'failed' => 10080, - 'monitored' => 10080, - ], + 'trim' => [ + 'recent' => 60, + 'pending' => 60, + 'completed' => 60, + 'recent_failed' => 10080, + 'failed' => 10080, + 'monitored' => 10080, + ], - /* - |-------------------------------------------------------------------------- - | Metrics - |-------------------------------------------------------------------------- - | - | Here you can configure how many snapshots should be kept to display in - | the metrics graph. This will get used in combination with Horizon's - | `horizon:snapshot` schedule to define how long to retain metrics. - | - */ + /* + |-------------------------------------------------------------------------- + | Metrics + |-------------------------------------------------------------------------- + | + | Here you can configure how many snapshots should be kept to display in + | the metrics graph. This will get used in combination with Horizon's + | `horizon:snapshot` schedule to define how long to retain metrics. + | + */ - 'metrics' => [ - 'trim_snapshots' => [ - 'job' => 24, - 'queue' => 24, - ], - ], + 'metrics' => [ + 'trim_snapshots' => [ + 'job' => 24, + 'queue' => 24, + ], + ], - /* - |-------------------------------------------------------------------------- - | Fast Termination - |-------------------------------------------------------------------------- - | - | When this option is enabled, Horizon's "terminate" command will not - | wait on all of the workers to terminate unless the --wait option - | is provided. Fast termination can shorten deployment delay by - | allowing a new instance of Horizon to start while the last - | instance will continue to terminate each of its workers. - | - */ + /* + |-------------------------------------------------------------------------- + | Fast Termination + |-------------------------------------------------------------------------- + | + | When this option is enabled, Horizon's "terminate" command will not + | wait on all of the workers to terminate unless the --wait option + | is provided. Fast termination can shorten deployment delay by + | allowing a new instance of Horizon to start while the last + | instance will continue to terminate each of its workers. + | + */ - 'fast_termination' => false, + 'fast_termination' => false, - /* - |-------------------------------------------------------------------------- - | Memory Limit (MB) - |-------------------------------------------------------------------------- - | - | This value describes the maximum amount of memory the Horizon worker - | may consume before it is terminated and restarted. You should set - | this value according to the resources available to your server. - | - */ + /* + |-------------------------------------------------------------------------- + | Memory Limit (MB) + |-------------------------------------------------------------------------- + | + | This value describes the maximum amount of memory the Horizon worker + | may consume before it is terminated and restarted. You should set + | this value according to the resources available to your server. + | + */ - 'memory_limit' => 64, + 'memory_limit' => 64, - /* - |-------------------------------------------------------------------------- - | Queue Worker Configuration - |-------------------------------------------------------------------------- - | - | Here you may define the queue worker settings used by your application - | in all environments. These supervisors and settings handle all your - | queued jobs and will be provisioned by Horizon during deployment. - | - */ + /* + |-------------------------------------------------------------------------- + | Queue Worker Configuration + |-------------------------------------------------------------------------- + | + | Here you may define the queue worker settings used by your application + | in all environments. These supervisors and settings handle all your + | queued jobs and will be provisioned by Horizon during deployment. + | + */ - 'environments' => [ - 'production' => [ - 'supervisor-1' => [ - 'connection' => 'redis', - 'queue' => ['high', 'default', 'feed'], - 'balance' => 'auto', - 'maxProcesses' => 20, - 'memory' => 128, - 'tries' => 3, - 'nice' => 0, - ], - ], + 'environments' => [ + 'production' => [ + 'supervisor-1' => [ + 'connection' => 'redis', + 'queue' => ['high', 'default', 'feed', 'delete'], + 'balance' => 'auto', + 'maxProcesses' => 20, + 'memory' => 128, + 'tries' => 3, + 'nice' => 0, + ], + ], - 'local' => [ - 'supervisor-1' => [ - 'connection' => 'redis', - 'queue' => ['high', 'default', 'feed'], - 'balance' => 'auto', - 'maxProcesses' => 20, - 'memory' => 128, - 'tries' => 3, - 'nice' => 0, - ], - ], - ], + 'local' => [ + 'supervisor-1' => [ + 'connection' => 'redis', + 'queue' => ['high', 'default', 'feed', 'delete'], + 'balance' => 'auto', + 'maxProcesses' => 20, + 'memory' => 128, + 'tries' => 3, + 'nice' => 0, + ], + ], + ], - 'darkmode' => env('HORIZON_DARKMODE', false), + 'darkmode' => env('HORIZON_DARKMODE', false), ];