diff --git a/app/Jobs/SharePipeline/SharePipeline.php b/app/Jobs/SharePipeline/SharePipeline.php index 186188ea..b62c5126 100644 --- a/app/Jobs/SharePipeline/SharePipeline.php +++ b/app/Jobs/SharePipeline/SharePipeline.php @@ -9,6 +9,11 @@ use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use League\Fractal; +use League\Fractal\Serializer\ArraySerializer; +use App\Transformer\ActivityPub\Verb\Announce; +use GuzzleHttp\{Pool, Client, Promise}; +use App\Util\ActivityPub\HttpSignature; class SharePipeline implements ShouldQueue { @@ -60,6 +65,8 @@ class SharePipeline implements ShouldQueue return true; } + $this->remoteAnnounceDeliver(); + try { $notification = new Notification; $notification->profile_id = $target->id; @@ -78,4 +85,56 @@ class SharePipeline implements ShouldQueue Log::error($e); } } + + public function remoteAnnounceDeliver() + { + $status = $this->status; + $profile = $status->profile; + + $fractal = new Fractal\Manager(); + $fractal->setSerializer(new ArraySerializer()); + $resource = new Fractal\Resource\Item($status, new Announce()); + $activity = $fractal->createData($resource)->toArray(); + + $audience = $status->profile->getAudienceInbox(); + + if(empty($audience) || $status->scope != 'public') { + // Return on profiles with no remote followers + return; + } + + $payload = json_encode($activity); + + $client = new Client([ + 'timeout' => config('federation.activitypub.delivery.timeout') + ]); + + $requests = function($audience) use ($client, $activity, $profile, $payload) { + foreach($audience as $url) { + $headers = HttpSignature::sign($profile, $url, $activity); + yield function() use ($client, $url, $headers, $payload) { + return $client->postAsync($url, [ + 'curl' => [ + CURLOPT_HTTPHEADER => $headers, + CURLOPT_POSTFIELDS => $payload, + CURLOPT_HEADER => true + ] + ]); + }; + } + }; + + $pool = new Pool($client, $requests($audience), [ + 'concurrency' => config('federation.activitypub.delivery.concurrency'), + 'fulfilled' => function ($response, $index) { + }, + 'rejected' => function ($reason, $index) { + } + ]); + + $promise = $pool->promise(); + + $promise->wait(); + + } }