Я нашел способ получить желаемое поведение, но я не уверен, что это лучший способ, поэтому я публикую его здесь и, возможно, могу дать мне обратную связь.
Когда мы говорим об очередях Laravel,большая часть настроек берется из app.php
, в частности из раздела Provider
.Мне удалось добавить поведение, которое мне нужно, чтобы переопределить класс Original QueueServiceProvider
и заменить его:
// Here is the original Provider Class
//Illuminate\Queue\QueueServiceProvider::class,
// Here is the overridden Provider
\App\Providers\QueueServiceProvider::class,
Новый класс QueueServiceProvider
выглядит следующим образом:
<?php
namespace App\Providers;
use App\Jobs\SqsNotifications\SqsConnector;
class QueueServiceProvider extends \Illuminate\Queue\QueueServiceProvider
{
/**
* Register the Amazon SQS queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSqsNotifConnector($manager)
{
$manager->addConnector('sqsNotif', function () {
return new SqsConnector();
});
}
public function registerConnectors($manager){
parent::registerConnectors($manager);
// Add the custom SQS notification connector
$this->registerSqsNotifConnector($manager);
}
}
Обратите внимание на новыйразъем sqsNotif
, который необходимо добавить к queue.php
'sqsNotif' => [
'driver' => 'sqsNotif',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_PREFIX', 'https://sqs.eu-central-1.amazonaws.com/your-account'),
'queue' => env('SQS_QUEUE', 'your-queue-name'),
'region' => env('AWS_DEFAULT_REGION', 'eu-central-1'),
],
. В новом QueueServiceProvider
мы просто регистрируем дополнительный разъем, код которого:
<?php
namespace App\Jobs\SqsNotifications;
use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;
class SqsConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
}
return new SqsQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? ''
);
}
}
SqsQueue также переопределяется следующим образом:
<?php
namespace App\Jobs\SqsNotifications;
class SqsQueue extends \Illuminate\Queue\SqsQueue
{
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue = $this->getQueue($queue),
'AttributeNames' => ['ApproximateReceiveCount'],
]);
if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
return new SqsJob(
$this->container, $this->sqs, $response['Messages'][0],
$this->connectionName, $queue
);
}
}
}
И последний отсутствующий фрагмент - SqsJob, определяемый так:
<?php
namespace App\Jobs\SqsNotifications;
use Illuminate\Queue\Jobs\JobName;
/**
* Class SqsJob
* @package App\Jobs\SqsNotifications
*
* Alternate SQS job that is used in case of S3 notifications
*/
class SqsJob extends \Illuminate\Queue\Jobs\SqsJob
{
/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
$bucketName = '';
// Define the name of the Process based on the bucket name
switch($this->payload()['Records'][0]['s3']['bucket']['name']){
case 'mybucket':
$bucketName = 'NewMyBucketFileJob';
break;
}
return $bucketName;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
// Mimic the original behavior with a different payload
$payload = $this->payload();
[$class, $method] = JobName::parse('\App\Jobs\\' . $this->getName() . '@handle');
($this->instance = $this->resolve($class))->{$method}($payload);
// The Job wasn't automatically deleted, so we need to delete it manually once the process went fine
$this->delete();
}
}
На данный момент мне просто нужно определитьЗадание обработки, например, как показано ниже, в NewMyBucketFileJob
:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessDataGateNewFile implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
}
/**
* Execute the job.
*
* @return void
*/
public function handle($data)
{
// Print the whole data structure
print_r($data);
// Or just the name of the uploaded file
print_r($data['Records'][0]['s3']['object']['key']);
}
}
Этот процесс работает, так что это решение, но включает в себя множество расширений классов, и оно довольно хрупкое в случаереализация внутренней очереди будет изменена в будущих выпусках.Мне искренне интересно, есть ли что-то более легкое или более надежное