Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Commit d5b395a

Browse files
authored
Merge pull request #133 from noname007/master
信息消费模式改变
2 parents 16b8a7b + 43cb74a commit d5b395a

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

src/Kafka/Consumer/Process.php

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
namespace Kafka\Consumer;
1616

17+
use Kafka\ConsumerConfig;
18+
1719
/**
1820
+------------------------------------------------------------------------------
1921
* Kafka protocol since Kafka v0.8
@@ -660,8 +662,31 @@ public function succFetch($result, $fd)
660662
// }}}
661663
// {{{ protected function commit()
662664

665+
protected function consume_msg()
666+
{
667+
foreach ($this->messages as $topic => $value) {
668+
foreach ($value as $part => $messages) {
669+
foreach ($messages as $message) {
670+
if ($this->consumer != null) {
671+
call_user_func($this->consumer, $topic, $part, $message);
672+
}
673+
}
674+
}
675+
}
676+
677+
$this->messages = array();
678+
}
679+
680+
663681
protected function commit()
664682
{
683+
$config= ConsumerConfig::getInstance();
684+
if($config->getConsumeMode() == ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET)
685+
{
686+
$this->consume_msg();
687+
}
688+
689+
665690
$broker = \Kafka\Broker::getInstance();
666691
$groupBrokerId = $broker->getGroupBrokerId();
667692
$connect = $broker->getMetaConnect($groupBrokerId);
@@ -704,6 +729,10 @@ protected function commit()
704729
// }}}
705730
// {{{ public function succCommit()
706731

732+
/**
733+
* @var State
734+
*/
735+
public $state;
707736
public function succCommit($result)
708737
{
709738
$this->debug('Commit success, result:' . json_encode($result));
@@ -716,17 +745,10 @@ public function succCommit($result)
716745
}
717746
}
718747
}
719-
720-
foreach ($this->messages as $topic => $value) {
721-
foreach ($value as $part => $messages) {
722-
foreach ($messages as $message) {
723-
if ($this->consumer != null) {
724-
call_user_func($this->consumer, $topic, $part, $message);
725-
}
726-
}
727-
}
748+
if(ConsumerConfig::getInstance()->getConsumeMode() == ConsumerConfig::CONSUME_AFTER_COMMIT_OFFSET)
749+
{
750+
$this->consume_msg();
728751
}
729-
$this->messages = array();
730752
}
731753

732754
// }}}

src/Kafka/ConsumerConfig.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,20 @@ public function setTopics($topics)
130130

131131
// }}}
132132
// }}}
133+
134+
protected $runtime_options = [
135+
'consume_mode' => self::CONSUME_AFTER_COMMIT_OFFSET
136+
];
137+
const CONSUME_AFTER_COMMIT_OFFSET = 1;
138+
const CONSUME_BEFORE_COMMIT_OFFSET = 2;
139+
140+
public function setConsumeMode($mode)
141+
{
142+
$this->runtime_options['consume_mode'] = $mode;
143+
}
144+
145+
public function getConsumeMode()
146+
{
147+
return $this->runtime_options['consume_mode'];
148+
}
133149
}

0 commit comments

Comments
 (0)