Skip to content

Commit b170a37

Browse files
committed
Sqs & Redis queue driver completed and tested
1 parent 3269e27 commit b170a37

15 files changed

+282
-103
lines changed

.env.example

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,16 @@ JS_EXCLUDE_DEPS=
214214

215215
QUEUE_CONNECTION=database
216216

217-
AWS_KEY='your-aws-access-key'
218-
AWS_SECRET='your-aws-secret-key'
219-
AWS_REGION=us-east-1
220-
AWS_SQS_QUEUE_URL='https://sqs.us-east-1.amazonaws.com/123456789012/your-queue-name'
217+
AWS_ACCESS_KEY_ID='your-aws-access-key'
218+
AWS_SECRET_ACCESS_KEY='your-aws-secret-key'
219+
AWS_DEFAULT_REGION=us-east-1
220+
# AWS_SQS_QUEUE_URL='https://sqs.us-east-1.amazonaws.com/your-account-id/your-queue-name'
221221

222-
REDIS_HOST=localhost
222+
ENABLE_REDIS=true
223+
REDIS_SERVER=localhost
223224
REDIS_PORT=6379
224-
REDIS_USERNAME=null
225-
REDIS_PWD=null
225+
REDIS_INDEX=1
226+
REDIS_PASS=null
227+
REDIS_SOCKET=/var/run/redis/redis-server.sock
226228
REDIS_PREFIX=
229+

config/app.php

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -429,26 +429,6 @@
429429
| 'cache_class' => env('CACHE_CLASS')
430430
*/
431431

432-
433-
/*
434-
| -------------
435-
| Redis Support
436-
| -------------
437-
|
438-
| Configure Redis details below to use it for caching
439-
*/
440-
'enable_redis' => env('ENABLE_REDIS', true),
441-
442-
'redis_server' => env('REDIS_SERVER', '127.0.0.1'),
443-
444-
'redis_port' => env('REDIS_PORT', 6379),
445-
446-
'redis_index' => env('REDIS_INDEX', 1),
447-
448-
'redis_pass' => env('REDIS_PASS'),
449-
450-
'redis_socket' => env('REDIS_SOCKET', '/var/run/redis/redis-server.sock'),
451-
452432
/*
453433
| -----------------
454434
| Memcached Support

config/redis.php

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,22 @@
11
<?php
22

33
return [
4-
'redis_host' => env('REDIS_HOST',"127.0.0.1"),
5-
'redis_port' => env('REDIS_PORT',"6379"),
6-
'redis_username' => env('REDIS_USERNAME', null),
7-
'redis_password' => env('REDIS_PWD', null),
8-
'redis_prefix' => env('REDIS_PREFIX', ''),
4+
/*
5+
| -------------
6+
| Redis Support
7+
| -------------
8+
|
9+
| Configure Redis details below to use it for caching
10+
*/
11+
'enable_redis' => env('ENABLE_REDIS', true),
12+
13+
'redis_server' => env('REDIS_SERVER', '127.0.0.1'),
14+
15+
'redis_port' => env('REDIS_PORT', 6379),
16+
17+
'redis_index' => env('REDIS_INDEX', 1),
18+
19+
'redis_pass' => env('REDIS_PASS'),
20+
21+
'redis_socket' => env('REDIS_SOCKET', ''),
922
];

config/sqs.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
<?php
22

33
return [
4-
'aws_key' => env('AWS_KEY',""),
5-
'aws_secret' => env('AWS_SECRET',""),
6-
'aws_region' => env('AWS_REGION', 'us-east-1'),
7-
'sqs_queue_url' => env('AWS_SQS_QUEUE_URL', null),
4+
'aws_key' => env('AWS_ACCESS_KEY_ID',""),
5+
'aws_secret' => env('AWS_SECRET_ACCESS_KEY',""),
6+
'aws_region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
7+
// 'sqs_queue_url' => env('AWS_SQS_QUEUE_URL', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
8+
'sqs_queue' => env('SQS_QUEUE', 'default'),
89
];

lib/cache.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ public function __construct($config) {
242242
*/
243243
public function connect() {
244244
$this->cache_con = Hm_Functions::redis();
245+
245246
try {
246247
if ($this->socket) {
247248
$con = $this->cache_con->connect($this->socket);
@@ -295,6 +296,13 @@ public function close() {
295296
return $this->cache_con->close();
296297
}
297298

299+
/**
300+
* Get the Redis connection
301+
* @return Redis|null
302+
*/
303+
public function getInstance() {
304+
return $this->cache_con;
305+
}
298306
/**
299307
* Get the Redis connection
300308
* @return Redis|null

lib/framework.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
require APP_PATH.'lib/api.php';
3737
require APP_PATH.'lib/webdav_formats.php';
3838
require APP_PATH.'lib/js_libs.php';
39+
require APP_PATH.'lib/sqs.php';
3940

4041
require_once APP_PATH.'modules/core/functions.php';
4142

lib/sqs.php

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
/**
77
* Amazon SQS wrapper
88
* @package framework
9-
* @subpackage queue
9+
* @subpackage sqs
1010
*/
1111
class Hm_AmazonSQS {
1212

1313
/** SQS Client */
1414
private static $sqsClient;
1515

1616
/** Required configuration parameters */
17-
static private $required_config = ['aws_key', 'aws_secret', 'aws_region', 'sqs_queue_url'];
17+
static private $required_config = ['aws_key', 'aws_secret', 'aws_region'];
1818

1919
/** SQS config */
2020
static private $config;
@@ -29,7 +29,7 @@ static private function parse_config($site_config) {
2929
'aws_key' => $site_config->get('aws_key', false),
3030
'aws_secret' => $site_config->get('aws_secret', false),
3131
'aws_region' => $site_config->get('aws_region', false),
32-
'sqs_queue_url' => $site_config->get('sqs_queue_url', false),
32+
'sqs_queue' => $site_config->get('sqs_queue', 'default'),
3333
];
3434

3535
foreach (self::$required_config as $v) {
@@ -69,22 +69,31 @@ static public function connect($site_config) {
6969
}
7070
}
7171

72+
static private function getQueueUrl(SqsClient $client, string $queueName) {
73+
try {
74+
$result = $client->getQueueUrl([
75+
'QueueName' => $queueName,
76+
]);
77+
return $result['QueueUrl'];
78+
} catch (AwsException $e) {
79+
Hm_Debug::add($e->getMessage());
80+
return false;
81+
}
82+
}
83+
7284
/**
7385
* Send a message to the SQS queue
7486
* @param string $message
7587
* @return string|false Message ID or false on failure
7688
*/
77-
static public function sendMessage($message, int $delay = 0) {
78-
if (!self::$sqsClient) {
79-
return false;
80-
}
81-
89+
static public function sendMessage(SqsClient $client, $message, int $delay = 0, string $queueName = null) {
8290
try {
83-
$result = self::$sqsClient->sendMessage([
84-
'QueueUrl' => self::$config['sqs_queue_url'],
91+
$result = $client->sendMessage([
92+
'QueueUrl' => self::getQueueUrl($client, !is_null($queueName) ? $queueName : self::$config['sqs_queue']),
8593
'MessageBody' => $message,
8694
'DelaySeconds' => $delay,
8795
]);
96+
8897
return $result['MessageId'];
8998
} catch (AwsException $e) {
9099
Hm_Debug::add($e->getMessage());
@@ -97,14 +106,10 @@ static public function sendMessage($message, int $delay = 0) {
97106
* @param int $maxMessages
98107
* @return array
99108
*/
100-
static public function receiveMessages($maxMessages = 1) {
101-
if (!self::$sqsClient) {
102-
return [];
103-
}
104-
109+
static public function receiveMessages(SqsClient $client, $maxMessages = 1) {
105110
try {
106-
$result = self::$sqsClient->receiveMessage([
107-
'QueueUrl' => self::$config['sqs_queue_url'],
111+
$result = $client->receiveMessage([
112+
'QueueUrl' => self::getQueueUrl($client, self::$config['sqs_queue']),
108113
'MaxNumberOfMessages' => $maxMessages,
109114
'WaitTimeSeconds' => 10,
110115
]);
@@ -120,14 +125,10 @@ static public function receiveMessages($maxMessages = 1) {
120125
* @param string $receiptHandle
121126
* @return bool
122127
*/
123-
static public function deleteMessage($receiptHandle) {
124-
if (!self::$sqsClient) {
125-
return false;
126-
}
127-
128+
static public function deleteMessage(SqsClient $client, $receiptHandle) {
128129
try {
129-
self::$sqsClient->deleteMessage([
130-
'QueueUrl' => self::$config['sqs_queue_url'],
130+
$client->deleteMessage([
131+
'QueueUrl' => self::getQueueUrl($client, self::$config['sqs_queue']),
131132
'ReceiptHandle' => $receiptHandle,
132133
]);
133134
return true;

services/Core/Commands/Hm_QueueWorkCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ protected function configure()
2929

3030
protected function execute(InputInterface $input, OutputInterface $output): int
3131
{
32-
$connection = $input->getArgument('connection') ?: 'default';
32+
$connection = $input->getArgument('connection') ?: env('QUEUE_CONNECTION', 'database');
3333
$queue = $input->getOption('queue') ?: 'default';
3434

3535
$output->writeln("Processing jobs from the [$queue] on connection [$connection]...");

services/Core/Hm_Container.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
use Hm_DB;
66
use Hm_Redis;
77
use Hm_AmazonSQS;
8-
use Services\Core\Queue\Hm_QueueManager;
98
use Symfony\Component\DependencyInjection\ContainerBuilder;
109
use Services\Providers\{ Hm_CommandServiceProvider, Hm_EventServiceProvider, Hm_SchedulerServiceProvider, Hm_QueueServiceProvider };
1110

@@ -35,11 +34,16 @@ public static function bind(): ContainerBuilder
3534
->setShared(true);
3635

3736
// Register Hm_Redis
38-
self::$container->register('redis', Hm_Redis::class)
37+
$redis = new Hm_Redis(self::$container->get('config'));
38+
$redis->connect();
39+
self::$container->set('redis.connection', $redis->getInstance());
40+
self::$container->register('redis', Hm_Redis::class)->setArgument(0, self::$container->get('config'))
41+
3942
->setShared(true);
4043

4144
// Register Hm_AmazonSQS
42-
self::$container->register('amazon.sqs', Hm_AmazonSQS::class)
45+
self::$container->set('amazon.sqs.connection', Hm_AmazonSQS::connect(self::$container->get('config')));
46+
self::$container->register('amazon.sqs',Hm_AmazonSQS::class)
4347
->setShared(true);
4448

4549
// Register Hm_CommandServiceProvider
@@ -48,8 +52,6 @@ public static function bind(): ContainerBuilder
4852

4953
// Register Hm_QueueServiceProvider
5054
self::$container->register('queue.ServiceProvider',Hm_QueueServiceProvider::class)
51-
// ->addArgument(new \Symfony\Component\DependencyInjection\Reference(Hm_Site_Config_File::class))
52-
// ->addArgument(null)
5355
->setShared(true);
5456

5557
self::$container->register('scheduler.ServiceProvider', Hm_SchedulerServiceProvider::class)

0 commit comments

Comments
 (0)