Skip to content

Commit 46fae2f

Browse files
committed
Complete Database queue driver and tested in real scenario
1 parent 8dd16b5 commit 46fae2f

19 files changed

+346
-74
lines changed

lib/db.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ static public function build_dsn() {
9292
* @return boolean|integer|array
9393
*/
9494
static public function execute($dbh, $sql, $args, $type = false, $all = false) {
95+
9596
if (!$dbh) {
9697
return false;
9798
}

services/Commands/Hm_CheckMailCommand.php

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,7 @@ protected function configure()
3232
protected function execute(InputInterface $input, OutputInterface $output): int
3333
{
3434
$this->info("Checking for new mail...");
35-
36-
// Example: Call the mail checking service from the container
37-
// $imap = $this->getService('Hm_Imap');
38-
// $newMessages = $imap->search('UNSEEN');
3935
Hm_ProcessNewEmail::dispatch(email: 'muhngesteven@gmail.com');
40-
41-
if (!empty($newMessages)) {
42-
$this->success('You have new messages!');
43-
// dispatch event
44-
} else {
45-
$this->info('No new messages.');
46-
}
47-
4836
return Command::SUCCESS;
4937
}
5038
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
namespace Services\Contracts\Queue;
4+
5+
use Services\Core\Jobs\Hm_BaseJob;
6+
7+
interface Hm_Queueable
8+
{
9+
public function process(Hm_BaseJob $job): void;
10+
}

services/Core/Events/Hm_BaseEvent.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
namespace Services\Core\Events;
44

5+
use Services\Traits\Hm_Serializes;
6+
57
abstract class Hm_BaseEvent
68
{
7-
9+
use Hm_Serializes;
10+
811
protected array $params;
912

1013
public function __construct(...$params)

services/Core/Events/Hm_EventDispatcher.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public static function listen(string $eventClass, string $listenerClass): void
1313

1414
public static function dispatch($event): void
1515
{
16-
$eventClass = get_class($event);
16+
$eventClass = get_class($event);
1717
// Check if there are listeners for this event
1818
if (isset(self::$listeners[$eventClass])) {
1919
foreach (self::$listeners[$eventClass] as $listenerClass) {

services/Core/Jobs/Hm_BaseJob.php

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66

77
abstract class Hm_BaseJob implements Hm_Job
88
{
9-
protected string $driver = 'database';
10-
public function __construct(protected array $data = []) {}
9+
public string $driver = 'database';
10+
public int $tries = 3;
11+
protected int $attempts = 0;
12+
13+
public function __construct(protected array $data = []) {
14+
$this->data = $data;
15+
}
1116

1217
public function handle(): void {}
1318
public function failed(): void {}
@@ -17,4 +22,21 @@ public function getDriver(): string
1722
return $this->driver;
1823
}
1924

25+
public function getAttempts(): int
26+
{
27+
return $this->attempts;
28+
}
29+
30+
// Method to increment the attempt count
31+
public function incrementAttempts(): void
32+
{
33+
$this->attempts++;
34+
}
35+
36+
// Check if the job has exceeded the max attempts
37+
public function hasExceededMaxAttempts(): bool
38+
{
39+
return $this->attempts >= $this->tries;
40+
}
41+
2042
}

services/Core/Queue/Drivers/Hm_DatabaseQueue.php

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,23 @@
55
use PDO;
66
use Hm_DB;
77
use Services\Core\Jobs\Hm_BaseJob;
8+
use Services\Contracts\Queue\Hm_Queueable;
89
use Services\Contracts\Queue\Hm_ShouldQueue;
910

1011
/**
1112
* Class Hm_DatabaseQueue
12-
* @package App\Queue\Drivers
13+
* @package Services\Core\Queue\Drivers
1314
*/
14-
class Hm_DatabaseQueue implements Hm_ShouldQueue
15+
class Hm_DatabaseQueue implements Hm_ShouldQueue, Hm_Queueable
1516
{
17+
protected const FAILED_JOBS_TABLE = 'hm_failed_jobs';
1618

1719
/**
1820
* Hm_DatabaseQueue constructor.
1921
* @param Hm_DB $db
2022
* @param PDO $dbConnection
2123
*/
22-
public function __construct(private Hm_DB $db, protected PDO $dbConnection) {
23-
}
24+
public function __construct(private Hm_DB $db, protected PDO $dbConnection) {}
2425

2526
/**
2627
* Push the job to the queue
@@ -30,7 +31,12 @@ public function __construct(private Hm_DB $db, protected PDO $dbConnection) {
3031
*/
3132
public function push(Hm_BaseJob $job): void {
3233
$sql = "INSERT INTO hm_jobs (payload) VALUES (:payload)";
33-
$this->db->execute($this->dbConnection, $sql, ['payload' => serialize($job)], 'insert');
34+
try {
35+
// Use the __serialize method from the Serializer trait
36+
$this->db->execute($this->dbConnection, $sql, ['payload' => serialize($job)], 'insert');
37+
} catch (\Throwable $th) {
38+
throw new \Exception("Failed to push job to the queue: " . $th->getMessage());
39+
}
3440
}
3541

3642
/**
@@ -45,7 +51,11 @@ public function pop(): ?Hm_BaseJob {
4551
if ($jobRecord) {
4652
$deleteSql = "DELETE FROM hm_jobs WHERE id = :id";
4753
$this->db->execute($this->dbConnection, $deleteSql, ['id' => $jobRecord['id']], 'modify');
48-
return unserialize($jobRecord['payload']);
54+
55+
// Use the __unserialize method from the Serializer trait
56+
$job = unserialize($jobRecord['payload']);
57+
$job->incrementAttempts();
58+
return $job;
4959
}
5060

5161
return null;
@@ -64,4 +74,84 @@ public function release(Hm_BaseJob $job, int $delay = 0): void {
6474
}
6575
$this->push($job);
6676
}
77+
78+
/**
79+
* Process the job and handle failures.
80+
*
81+
* @param Hm_BaseJob $job
82+
* @param int $maxAttempts
83+
* @return void
84+
*/
85+
public function process(Hm_BaseJob $job): void
86+
{
87+
try {
88+
$job->handle();
89+
} catch (\Exception $e) {
90+
$job->incrementAttempts();
91+
if ($job->getAttempts() >= $job->tries) {
92+
$this->fail($job, $e);
93+
} else {
94+
$this->release($job, 5);
95+
}
96+
}
97+
}
98+
99+
/**
100+
* Move job to failed jobs table after max attempts.
101+
*
102+
* @param Hm_BaseJob $job
103+
* @param Exception $exception
104+
* @return void
105+
*/
106+
public function fail(Hm_BaseJob $job, \Exception $exception): void
107+
{
108+
$sql = "INSERT INTO " . self::FAILED_JOBS_TABLE . " (payload, failed_at, exception) VALUES (:payload, :failed_at, :exception)";
109+
$this->db->execute(
110+
$this->dbConnection,
111+
$sql,
112+
[
113+
'payload' => serialize($job), // This still requires serialization, keep in mind
114+
'failed_at' => (new \DateTime())->format('Y-m-d H:i:s'),
115+
'exception' => $exception->getMessage()
116+
],
117+
'insert'
118+
);
119+
}
120+
121+
/**
122+
* Retry a failed job by moving it back to the main queue.
123+
*
124+
* @param int $failedJobId
125+
* @return void
126+
*/
127+
public function retry(int $failedJobId): void
128+
{
129+
$sql = "SELECT * FROM " . self::FAILED_JOBS_TABLE . " WHERE id = :id";
130+
$failedJobRecord = $this->db->execute($this->dbConnection, $sql, ['id' => $failedJobId], 'select');
131+
132+
if ($failedJobRecord) {
133+
$job = unserialize($failedJobRecord['payload']);
134+
135+
// Remove from failed jobs table
136+
$deleteSql = "DELETE FROM " . self::FAILED_JOBS_TABLE . " WHERE id = :id";
137+
$this->db->execute($this->dbConnection, $deleteSql, ['id' => $failedJobId], 'modify');
138+
139+
// Push back to the main queue
140+
$this->push($job);
141+
}
142+
}
143+
144+
/**
145+
* Log the failed job.
146+
*
147+
* @param Hm_BaseJob $job
148+
* @return void
149+
*/
150+
protected function logFailedJob(Hm_BaseJob $job): void {
151+
$sql = "INSERT INTO failed_jobs (payload, attempts) VALUES (:payload, :attempts)";
152+
$this->db->execute($this->dbConnection, $sql, [
153+
'payload' => serialize($job), // This still requires serialization
154+
'attempts' => $job->getAttempts()
155+
], 'insert');
156+
}
67157
}

services/Core/Queue/Drivers/Hm_RedisQueue.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Services\Core\Queue\Drivers;
44

55
use Hm_Redis;
6-
use Services\Jobs\Hm_BaseJob;
6+
use Services\Core\Jobs\Hm_BaseJob;
77
use Services\Contracts\Queue\Hm_ShouldQueue;
88

99
/**
@@ -66,4 +66,8 @@ public function release(Hm_BaseJob $job, int $delay = 0): void {
6666
}
6767
$this->push($job);
6868
}
69+
public function process(Hm_BaseJob $job): void
70+
{
71+
//TO DO: Implement process() method
72+
}
6973
}

services/Core/Queue/Hm_JobDispatcher.php

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Services\Core\Jobs\Hm_BaseJob;
66
use Services\Contracts\Queue\Hm_ShouldQueue;
7+
use Services\Core\Hm_Container;
78
use Symfony\Component\DependencyInjection\ContainerInterface;
89

910
/**
@@ -12,32 +13,17 @@
1213
*/
1314
class Hm_JobDispatcher
1415
{
15-
protected Hm_QueueManager $queueManager;
16-
protected string $defaultDriver;
17-
18-
/**
19-
* Hm_JobDispatcher constructor.
20-
* @param Hm_QueueManager $queueManager
21-
* @param string $defaultDriver
22-
*/
23-
public function __construct(ContainerInterface $container, string $defaultDriver = 'redis')
24-
{
25-
$this->queueManager = $container->get('Hm_QueueManager');//$this->queueManager = $queueManager;
26-
$this->defaultDriver = $defaultDriver;
27-
}
28-
2916
/**
3017
* Dispatch the job to the queue
3118
*
3219
* @param Hm_BaseJob $job
3320
* @param string|null $queue
3421
* @return void
3522
*/
36-
public function dispatch(Hm_BaseJob $job, string $queue = null): void {
37-
if ($job instanceof Hm_ShouldQueue) {
38-
$driver = $job->driver ?? $this->defaultDriver;
39-
$queueDriver = $this->queueManager->getDriver($driver);
40-
23+
static public function dispatch(Hm_BaseJob $job): void {
24+
if (is_subclass_of($job, Hm_ShouldQueue::class)) {
25+
$driver = $job->driver;
26+
$queueDriver = Hm_Container::getContainer()->get('queue.manager')->getDriver($driver);
4127
if ($queueDriver) {
4228
$queueDriver->push($job);
4329
} else {
@@ -46,7 +32,5 @@ public function dispatch(Hm_BaseJob $job, string $queue = null): void {
4632
}else {
4733
$job->handle();
4834
}
49-
// $driver = $this->queueManager->getDriver($queue ?? $this->defaultDriver);
50-
// $driver->push($job);
5135
}
5236
}

services/Core/Queue/Hm_QueueWorker.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ public function __construct(Hm_ShouldQueue $queue)
2727
* @return void
2828
*/
2929
public function work(): void {
30-
dd($this->queue);
3130
while ($job = $this->queue->pop())
3231
{
3332
try {
34-
$job->handle();
33+
// dd($job);
34+
$this->queue->process($job);
3535
} catch (\Exception $e) {
36-
$job->failed();
37-
// Optionally release the job back to the queue with a delay
38-
$this->queue->release($job, 30);
36+
// $job->failed();
37+
// // Optionally release the job back to the queue with a delay
38+
// $this->queue->release($job, 30);
3939
}
4040
}
4141
}

services/Events/Hm_NewEmailProcessedEvent.php

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,23 @@
22

33
namespace Services\Events;
44

5-
use Services\Core\Events\Hm_BaseEvent;
65
use Services\Traits\Hm_Dispatchable;
6+
use Services\Core\Events\Hm_BaseEvent;
7+
use Services\Traits\Hm_InteractsWithQueue;
8+
use Services\Contracts\Queue\Hm_ShouldQueue;
79

8-
class Hm_NewEmailProcessedEvent extends Hm_BaseEvent
10+
class Hm_NewEmailProcessedEvent extends Hm_BaseEvent implements Hm_ShouldQueue
911
{
10-
use Hm_Dispatchable;
12+
use Hm_Dispatchable, Hm_InteractsWithQueue;
13+
14+
/**
15+
* Create a new event instance.
16+
* @param $email
17+
*
18+
* @return void
19+
*/
20+
public function __construct(public string $email)
21+
{
22+
$this->email = $email;
23+
}
1124
}

0 commit comments

Comments
 (0)