Skip to content

Commit 4d4b935

Browse files
committed
fix publish job
1 parent edc2492 commit 4d4b935

File tree

2 files changed

+6
-13
lines changed

2 files changed

+6
-13
lines changed

src/Consumer.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,17 @@ public function setPrefetchCount(int $value): void
6363
*
6464
* @throws Throwable
6565
*/
66-
public function daemon($connectionName, $queue, WorkerOptions $options): int
66+
public function daemon($connectionName, $queue, WorkerOptions $options)
6767
{
6868
if ($this->supportsAsyncSignals()) {
6969
$this->listenForSignals();
7070
}
7171

72-
$config = (array) $options;
73-
7472
$lastRestart = $this->getTimestampOfLastQueueRestart();
7573

7674
[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
7775

7876
$connection = $this->manager->connection($connectionName);
79-
$connection->declareQueue($queue);
80-
$connection->setOptions($config);
8177

8278
$this->channel = $connection->getChannel();
8379

@@ -144,7 +140,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
144140
$this->exceptions->report($exception);
145141

146142
$this->kill(1);
147-
} catch (Exception | Throwable $exception) {
143+
} catch (Exception|Throwable $exception) {
148144
$this->exceptions->report($exception);
149145

150146
$this->stopWorkerIfLostConnection($exception);

src/RabbitQueue.php

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ public function push($job, $data = '', $queue = null)
6363
$this->createPayload($job, $this->getQueue($queue), $data),
6464
$queue,
6565
null,
66-
function ($payload, $queue) use ($data) {
67-
return $this->pushRaw(payload: $payload, queue: $queue);
66+
function ($payload, $queue) {
67+
return $this->pushRaw($payload, $queue);
6868
}
6969
);
7070
}
@@ -382,7 +382,7 @@ protected function createMessage($payload, int $attempts = 2): array
382382
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
383383
];
384384

385-
$currentPayload = json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
385+
$currentPayload = json_decode($payload, true, 512);
386386
if ($correlationId = $currentPayload['id'] ?? null) {
387387
$properties['correlation_id'] = $correlationId;
388388
}
@@ -415,13 +415,10 @@ protected function createMessage($payload, int $attempts = 2): array
415415
protected function publishProperties($queue, array $options = []): array
416416
{
417417
$queue = $this->getQueue($queue);
418-
// @todo move to config
419-
$attempts = 3;
418+
$attempts = Arr::get($options, 'attempts') ?: 0;
420419

421420
$destination = $queue;
422421
$exchange = $queue;
423-
424-
// @todo move to config
425422
$exchangeType = AMQPExchangeType::TOPIC;
426423

427424
return [$destination, $exchange, $exchangeType, $attempts];

0 commit comments

Comments
 (0)