Skip to content

Commit 1b03512

Browse files
committed
Scheduling Comlete, next step Notification with channels: telegram,slack,twilio,nexmo,broadcast
1 parent dac1a73 commit 1b03512

22 files changed

+441
-133
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ WIN_CACERT_DIR=
212212

213213
JS_EXCLUDE_DEPS=
214214

215+
QUEUE_ENABLED=false
215216
QUEUE_DRIVER=database
216217

217218
AWS_ACCESS_KEY_ID='your-aws-access-key'

lib/cache.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,6 @@ protected function noop_get($key, $default) {
603603
* @return string
604604
*/
605605
protected function key_hash($key) {
606-
dd($this->session->get('fingerprint'));
607606
return sprintf('hm_cache_%s', hash('sha256', (sprintf('%s%s%s%s', $key, SITE_ID,
608607
$this->session->get('fingerprint'), $this->session->get('username')))));
609608
}

services/Core/Commands/Hm_QueueWorkCommand.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,17 @@
1111
use Symfony\Component\Console\Output\OutputInterface;
1212
use Symfony\Component\DependencyInjection\ContainerInterface;
1313

14+
/**
15+
* Class Hm_QueueWorkCommand
16+
* @package Services\Core\Commands
17+
*/
1418
class Hm_QueueWorkCommand extends Hm_BaseCommand
1519
{
1620
protected static $defaultName = 'queue:work';
1721

22+
/**
23+
* Configure the command.
24+
*/
1825
protected function configure()
1926
{
2027
$this
@@ -27,6 +34,13 @@ protected function configure()
2734
->addOption('tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 1);
2835
}
2936

37+
/**
38+
* Execute the console command.
39+
*
40+
* @param InputInterface $input
41+
* @param OutputInterface $output
42+
* @return int Command exit code.
43+
*/
3044
protected function execute(InputInterface $input, OutputInterface $output): int
3145
{
3246
$connection = $input->getArgument('connection') ?: env('QUEUE_DRIVER', 'database');

services/Core/Commands/Hm_ScheduleRunCommand.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,22 @@
88
use Symfony\Component\Console\Input\InputInterface;
99
use Symfony\Component\Console\Output\OutputInterface;
1010

11+
/**
12+
* Class Hm_ScheduleRunCommand
13+
* @package Services\Core\Commands
14+
*/
1115
class Hm_ScheduleRunCommand extends Hm_BaseCommand
1216
{
13-
// Default name for the command
17+
/**
18+
* The name of the command.
19+
*
20+
* @var string
21+
*/
1422
protected static $defaultName = 'schedule:run';
1523

24+
/**
25+
* Configure the command.
26+
*/
1627
protected function configure()
1728
{
1829
$this
@@ -21,11 +32,16 @@ protected function configure()
2132
;
2233
}
2334

35+
/**
36+
* Execute the console command.
37+
*
38+
* @param InputInterface $input
39+
* @param OutputInterface $output
40+
* @return int Command exit code.
41+
*/
2442
protected function execute(InputInterface $input, OutputInterface $output): int
2543
{
26-
// Get the scheduler instance from the container
2744
$scheduler = Hm_Container::getContainer()->get('scheduler');
28-
// Run the tasks that are due
2945
$scheduler->run();
3046

3147
$output->writeln("All due scheduled tasks have been executed.");
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
3+
namespace Services\Core\Commands;
4+
5+
use Services\Core\Hm_Container;
6+
use Symfony\Component\Console\Command\Command;
7+
use Symfony\Component\Console\Input\InputInterface;
8+
use Symfony\Component\Console\Output\OutputInterface;
9+
10+
/**
11+
* Class Hm_SchedulerWorkCommand
12+
* @package Services\Core\Commands
13+
*/
14+
class Hm_SchedulerWorkCommand extends Hm_BaseCommand
15+
{
16+
/**
17+
* The name of the command.
18+
*
19+
* @var string
20+
*/
21+
protected static $defaultName = 'schedule:work';
22+
23+
/**
24+
* Flag to indicate if the scheduler should stop running.
25+
* @var bool
26+
*/
27+
private $shouldStop = false;
28+
29+
/**
30+
* Store the last run time for each task to prevent overlapping runs.
31+
* @var array
32+
*/
33+
private $lastRunTimes = [];
34+
35+
/**
36+
* Configure the command.
37+
*/
38+
protected function configure()
39+
{
40+
$this
41+
->setDescription('Continuously run the scheduler to execute due tasks')
42+
->setHelp('This command runs the scheduler in a loop to continuously check and execute scheduled tasks.');
43+
}
44+
45+
/**
46+
* Execute the console command.
47+
*
48+
* @param InputInterface $input
49+
* @param OutputInterface $output
50+
* @return int Command exit code.
51+
*/
52+
protected function execute(InputInterface $input, OutputInterface $output): int
53+
{
54+
$scheduler = Hm_Container::getContainer()->get('scheduler');
55+
$output->writeln("Scheduler started. Press Ctrl+C to stop.");
56+
57+
if (function_exists('pcntl_signal')) {
58+
pcntl_signal(SIGINT, function () {
59+
$this->shouldStop = true;
60+
});
61+
}
62+
63+
while (!$this->shouldStop) {
64+
foreach ($scheduler->getTasks() as $task) {
65+
$taskId = spl_object_hash($task);
66+
$currentTime = new \DateTime('now', new \DateTimeZone($task->getTimezone()));
67+
68+
$lastRunTime = isset($this->lastRunTimes[$taskId]) ? $this->lastRunTimes[$taskId] : $currentTime;
69+
70+
$this->lastRunTimes[$taskId] = $currentTime;
71+
72+
if ($task->isDue() && $currentTime > $lastRunTime) {
73+
$output->writeln("Running task: {$task->getName()} at " . $currentTime->format('Y-m-d H:i:s'));
74+
$task->run();
75+
$output->writeln("Task: {$task->getName()} added to queue");
76+
}
77+
}
78+
79+
// Wait one minute before the next loop iteration
80+
sleep(60);
81+
82+
// Dispatch any pending signals
83+
if (function_exists('pcntl_signal_dispatch')) {
84+
pcntl_signal_dispatch();
85+
}
86+
}
87+
88+
$output->writeln("Scheduler stopped gracefully.");
89+
90+
return Command::SUCCESS;
91+
}
92+
93+
/**
94+
* Stops the scheduler loop gracefully.
95+
*/
96+
public function stop()
97+
{
98+
$this->shouldStop = true;
99+
}
100+
}

services/Core/Hm_Container.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
use Symfony\Component\DependencyInjection\ContainerBuilder;
99
use Services\Providers\{Hm_CommandServiceProvider, Hm_EventServiceProvider, Hm_SchedulerServiceProvider, Hm_QueueServiceProvider};
1010

11+
/**
12+
* Class Hm_Container
13+
* @package Services\Core
14+
*/
1115
class Hm_Container
1216
{
1317
private static $container = null;
@@ -16,6 +20,12 @@ class Hm_Container
1620
private function __construct() {}
1721
private function __clone() {}
1822

23+
/**
24+
* Set the container
25+
*
26+
* @param ContainerBuilder $containerBuilder
27+
* @return ContainerBuilder
28+
*/
1929
public static function setContainer(ContainerBuilder $containerBuilder): ContainerBuilder
2030
{
2131
if (self::$container === null) {
@@ -25,12 +35,17 @@ public static function setContainer(ContainerBuilder $containerBuilder): Contain
2535
return self::$container;
2636
}
2737

38+
/**
39+
* Bind the container
40+
*
41+
* @return ContainerBuilder
42+
*/
2843
public static function bind(): ContainerBuilder
2944
{
3045
$config = self::$container->get('config');
3146

3247
if ($config->get('queue_enabled')) {
33-
48+
3449
if ($config->get('queue_driver') === 'database') {
3550
// Register Hm_DB
3651
self::$container->set('db.connection', Hm_DB::connect(self::$container->get('config')));
@@ -67,6 +82,11 @@ public static function bind(): ContainerBuilder
6782
return self::$container;
6883
}
6984

85+
/**
86+
* Get the container
87+
*
88+
* @return ContainerBuilder
89+
*/
7090
public static function getContainer(): ContainerBuilder
7191
{
7292
return self::$container;

services/Core/Jobs/Hm_BaseJob.php

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,61 @@
66

77
abstract class Hm_BaseJob implements Hm_Job
88
{
9-
public string $driver = 'database';
9+
public string $driver = '';
1010
public int $tries = 3;
1111
protected int $attempts = 0;
1212

1313
public function __construct(protected array $data = []) {
1414
$this->data = $data;
1515
}
1616

17+
/**
18+
* Execute the job.
19+
*
20+
* @return void
21+
*/
1722
public function handle(): void {}
23+
/**
24+
* Handle a job failure.
25+
*
26+
* @return void
27+
*/
1828
public function failed(): void {}
1929

30+
/**
31+
* Get the driver name for the job.
32+
*
33+
* @return int
34+
*/
2035
public function getDriver(): string
2136
{
2237
return $this->driver;
2338
}
2439

40+
/**
41+
* Get the number of times the job has been attempted.
42+
*
43+
* @return int
44+
*/
2545
public function getAttempts(): int
2646
{
2747
return $this->attempts;
2848
}
2949

30-
// Method to increment the attempt count
50+
/**
51+
* Method to increment the attempt count
52+
*
53+
*/
3154
public function incrementAttempts(): void
3255
{
3356
$this->attempts++;
3457
}
3558

36-
// Check if the job has exceeded the max attempts
59+
/**
60+
* Determine if the job has exceeded the maximum number of attempts.
61+
*
62+
* @return bool
63+
*/
3764
public function hasExceededMaxAttempts(): bool
3865
{
3966
return $this->attempts >= $this->tries;

services/Core/Queue/Hm_JobDispatcher.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class Hm_JobDispatcher
2323
static public function dispatch(Hm_BaseJob $job): void {
2424
if (is_subclass_of($job, Hm_ShouldQueue::class)) {
2525
$driver = $job->driver;
26+
dd($driver);
2627
$queueDriver = Hm_Container::getContainer()->get('queue.manager')->getDriver($driver);
2728
if ($queueDriver) {
2829
$queueDriver->push($job);

services/Core/Queue/Hm_QueueManager.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public function addDriver(string $name, Hm_ShouldQueue $driver): void
2323

2424
public function getDriver(string $name): Hm_ShouldQueue
2525
{
26+
dump("Getting driver $name");
2627
return $this->drivers[$name];
2728
}
2829
}

services/Core/Queue/Hm_QueueWorker.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ public function work(): void {
3030
while ($job = $this->queue->pop())
3131
{
3232
try {
33-
// dd($job);
3433
$this->queue->process($job);
3534
} catch (\Exception $e) {
3635
// $job->failed();

services/Core/Scheduling/Hm_CacheMutex.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public function release($task)
6767
*/
6868
private function getMutexKey($task)
6969
{
70-
return 'mutex_' . hash('sha256', get_class($task) . $task->name . json_encode($task->command));
71-
// return 'mutex_' . hash('sha256', $task->name);
70+
// return 'mutex_' . hash('sha256', get_class($task) . $task->name . json_encode($task->command));
71+
return 'mutex_' . hash('sha256', $task->name);
7272
}
7373

7474
/**

0 commit comments

Comments
 (0)