Skip to content

Commit a074471

Browse files
committed
initial
1 parent 13d10fd commit a074471

File tree

9 files changed

+1010
-38
lines changed

9 files changed

+1010
-38
lines changed

Config/RabbitMQConnectionConfig.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
return [
4+
'driver' => 'rabbitmq',
5+
'queue' => env('RABBITMQ_QUEUE', 'default'),
6+
7+
'hosts' => [
8+
[
9+
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
10+
'port' => env('RABBITMQ_PORT', 5672),
11+
'user' => env('RABBITMQ_USER', 'guest'),
12+
'password' => env('RABBITMQ_PASSWORD', 'guest'),
13+
'vhost' => env('RABBITMQ_VHOST', '/'),
14+
],
15+
],
16+
17+
'options' => [
18+
'ssl_options' => [
19+
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
20+
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
21+
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
22+
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
23+
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
24+
],
25+
'queue' => [
26+
'job' => \iamfarhad\LaravelRabbitMQ\Jobs\RabbitMQJob::class,
27+
],
28+
],
29+
];

composer.json

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,19 @@
1515
},
1616
"autoload": {
1717
"psr-4": {
18-
"Iamfarhad\\LaravelRabbitMQ\\": "src/"
18+
"iamfarhad\\LaravelRabbitMQ\\": "src/"
1919
}
2020
},
2121
"autoload-dev": {
2222
"psr-4": {
23-
"Iamfarhad\\LaravelRabbitMQ\\Tests\\Unit\\": "tests/Unit"
23+
"iamfarhad\\LaravelRabbitMQ\\Tests\\Unit\\": "tests/Unit"
24+
}
25+
},
26+
"extra": {
27+
"laravel": {
28+
"providers": [
29+
"iamfarhad\\LaravelRabbitMQ\\LaravelRabbitQueueServiceProvider"
30+
]
2431
}
2532
},
2633
"license": "MIT",
@@ -30,5 +37,8 @@
3037
"email": "farhad.pd@gmail.com"
3138
}
3239
],
33-
"minimum-stability": "dev"
40+
"minimum-stability": "dev",
41+
"suggest": {
42+
"ext-pcntl": "Required to use all features of the queue consumer."
43+
}
3444
}

composer.lock

Lines changed: 35 additions & 35 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Connectors/RabbitMQConnector.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace iamfarhad\LaravelRabbitMQ\Connectors;
4+
5+
use iamfarhad\LaravelRabbitMQ\RabbitQueue;
6+
use Illuminate\Contracts\Events\Dispatcher;
7+
use Illuminate\Contracts\Queue\Queue;
8+
use Illuminate\Queue\Connectors\ConnectorInterface;
9+
use Illuminate\Queue\Events\WorkerStopping;
10+
use PhpAmqpLib\Connection\AMQPConnectionConfig;
11+
use PhpAmqpLib\Connection\AMQPConnectionFactory;
12+
13+
class RabbitMQConnector implements ConnectorInterface
14+
{
15+
private readonly Dispatcher $dispatcher;
16+
17+
public function __construct(Dispatcher $dispatcher)
18+
{
19+
$this->dispatcher = $dispatcher;
20+
}
21+
22+
public function connect(array $config = []): Queue
23+
{
24+
$configs = new AMQPConnectionConfig();
25+
// @todo move to config
26+
$configs->setHost('eagle-rmq');
27+
28+
$connection = AMQPConnectionFactory::create($configs);
29+
30+
$queueConnection = new RabbitQueue($connection, 'default');
31+
32+
$this->dispatcher->listen(WorkerStopping::class, static function () use ($queueConnection): void {
33+
$queueConnection->close();
34+
});
35+
36+
return $queueConnection;
37+
}
38+
}

src/Console/ConsumeCommand.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace iamfarhad\LaravelRabbitMQ\Console;
4+
5+
use Illuminate\Queue\Console\WorkCommand;
6+
use Illuminate\Support\Str;
7+
use iamfarhad\LaravelRabbitMQ\Consumer;
8+
use Symfony\Component\Console\Attribute\AsCommand;
9+
10+
#[AsCommand(name: 'rabbitmq:consume')]
11+
class ConsumeCommand extends WorkCommand
12+
{
13+
protected $signature = 'rabbitmq:consume
14+
{connection? : The name of the queue connection to work}
15+
{--name=default : The name of the consumer}
16+
{--queue= : The names of the queues to work}
17+
{--once : Only process the next job on the queue}
18+
{--stop-when-empty : Stop when the queue is empty}
19+
{--delay=0 : The number of seconds to delay failed jobs (Deprecated)}
20+
{--backoff=0 : The number of seconds to wait before retrying a job that encountered an uncaught exception}
21+
{--max-jobs=0 : The number of jobs to process before stopping}
22+
{--max-time=0 : The maximum number of seconds the worker should run}
23+
{--force : Force the worker to run even in maintenance mode}
24+
{--memory=128 : The memory limit in megabytes}
25+
{--sleep=3 : Number of seconds to sleep when no job is available}
26+
{--timeout=60 : The number of seconds a child process can run}
27+
{--tries=1 : Number of times to attempt a job before logging it failed}
28+
{--rest=0 : Number of seconds to rest between jobs}
29+
{--max-priority=}
30+
{--consumer-tag}
31+
{--prefetch-size=0}
32+
{--prefetch-count=1000}
33+
';
34+
35+
protected $description = 'Consume messages';
36+
37+
public function handle(): void
38+
{
39+
/** @var Consumer $consumer */
40+
$consumer = $this->worker;
41+
42+
$consumer->setContainer($this->laravel);
43+
$consumer->setName($this->option('name'));
44+
$consumer->setConsumerTag($this->consumerTag());
45+
$consumer->setMaxPriority((int) $this->option('max-priority'));
46+
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
47+
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
48+
49+
parent::handle();
50+
}
51+
52+
protected function consumerTag(): string
53+
{
54+
if ($consumerTag = $this->option('consumer-tag')) {
55+
return $consumerTag;
56+
}
57+
58+
$consumerTag = implode('_', [
59+
Str::slug(config('app.name', 'laravel')),
60+
Str::slug($this->option('name')),
61+
md5(serialize($this->options()) . Str::random(16) . getmypid()),
62+
]);
63+
64+
return Str::substr($consumerTag, 0, 255);
65+
}
66+
}

0 commit comments

Comments
 (0)