diff --git a/app/Jobs/StatusPipeline/StatusActivityPubDeliver.php b/app/Jobs/StatusPipeline/StatusActivityPubDeliver.php index b8cce57b8..61a7cc83c 100644 --- a/app/Jobs/StatusPipeline/StatusActivityPubDeliver.php +++ b/app/Jobs/StatusPipeline/StatusActivityPubDeliver.php @@ -13,6 +13,10 @@ use League\Fractal; use League\Fractal\Serializer\ArraySerializer; use App\Transformer\ActivityPub\Verb\CreateNote; use App\Util\ActivityPub\Helpers; +use GuzzleHttp\Pool; +use GuzzleHttp\Client; +use GuzzleHttp\Promise; +use App\Util\ActivityPub\HttpSignature; class StatusActivityPubDeliver implements ShouldQueue { @@ -51,16 +55,45 @@ class StatusActivityPubDeliver implements ShouldQueue } $audience = $status->profile->getAudienceInbox(); + + if(empty($audience)) { + // Return on profiles with no remote followers + return; + } + $profile = $status->profile; Cache::forget('status:transformer:media:attachments:'.$status->id); + $fractal = new Fractal\Manager(); $fractal->setSerializer(new ArraySerializer()); $resource = new Fractal\Resource\Item($status, new CreateNote()); $activity = $fractal->createData($resource)->toArray(); - foreach($audience as $url) { - Helpers::sendSignedObject($profile, $url, $activity); - } + $payload = json_encode($activity); + + $client = new Client([ + 'timeout' => config('pixelfed.ap_delivery_timeout') + ]); + + $requests = function() use ($client, $activity, $profile) { + foreach($audience as $url) { + $headers = HttpSignature::sign($profile, $url, $activity); + yield function() use ($client, $url, $activity) { + return $client->requestAsync('POST', $url, $headers, $activity); + }; + } + }; + + $pool = new Pool($client, $requests, [ + 'concurrency' => config('pixelfed.ap_delivery_concurrency'), + 'fulfilled' => function ($response, $index) { + }, + 'rejected' => function ($reason, $index) { + } + ]); + $promise = $pool->promise(); + + $promise->wait(); } } diff --git a/config/pixelfed.php b/config/pixelfed.php index 618e2cb4a..312b1d3d1 100644 --- a/config/pixelfed.php +++ b/config/pixelfed.php @@ -262,4 +262,6 @@ return [ 'enforce_account_limit' => env('LIMIT_ACCOUNT_SIZE', true), 'ap_inbox' => env('ACTIVITYPUB_INBOX', false), 'ap_shared' => env('ACTIVITYPUB_SHAREDINBOX', false), + 'ap_delivery_timeout' => env('ACTIVITYPUB_DELIVERY_TIMEOUT', 2.0), + 'ap_delivery_concurrency' => env('ACTIVITYPUB_DELIVERY_CONCURRENCY', 10) ];