diff --git a/src/CommonSocket.php b/src/CommonSocket.php index 503add43..0772466f 100644 --- a/src/CommonSocket.php +++ b/src/CommonSocket.php @@ -29,34 +29,6 @@ abstract class CommonSocket */ public const MAX_WRITE_BUFFER = 2048; - /** - * Send timeout in seconds. - * - * @var int - */ - protected $sendTimeoutSec = 0; - - /** - * Send timeout in microseconds. - * - * @var int - */ - protected $sendTimeoutUsec = 100000; - - /** - * Recv timeout in seconds - * - * @var int - */ - protected $recvTimeoutSec = 0; - - /** - * Recv timeout in microseconds - * - * @var int - */ - protected $recvTimeoutUsec = 750000; - /** * @var resource */ @@ -95,26 +67,6 @@ public function __construct(string $host, int $port, ?Config $config = null, ?Sa $this->saslProvider = $saslProvider; } - public function setSendTimeoutSec(int $sendTimeoutSec): void - { - $this->sendTimeoutSec = $sendTimeoutSec; - } - - public function setSendTimeoutUsec(int $sendTimeoutUsec): void - { - $this->sendTimeoutUsec = $sendTimeoutUsec; - } - - public function setRecvTimeoutSec(int $recvTimeoutSec): void - { - $this->recvTimeoutSec = $recvTimeoutSec; - } - - public function setRecvTimeoutUsec(int $recvTimeoutUsec): void - { - $this->recvTimeoutUsec = $recvTimeoutUsec; - } - public function setMaxWriteAttempts(int $number): void { $this->maxWriteAttempts = $number; @@ -139,16 +91,20 @@ protected function createStream(): void if ($this->config !== null && $this->config->getSslEnable()) { // ssl connection $remoteSocket = sprintf('ssl://%s:%s', $this->host, $this->port); + $filterFunction = function ($elem) { + return $elem !== ''; + }; $context = stream_context_create( [ - 'ssl' => [ - 'local_cert' => $this->config->getSslLocalCert(), - 'local_pk' => $this->config->getSslLocalPk(), - 'verify_peer' => $this->config->getSslVerifyPeer(), - 'passphrase' => $this->config->getSslPassphrase(), - 'cafile' => $this->config->getSslCafile(), - 'peer_name' => $this->config->getSslPeerName(), - ], + 'ssl' => array_filter([ + 'local_cert' => $this->config->getSslLocalCert(), + 'local_pk' => $this->config->getSslLocalPk(), + 'verify_peer' => true, + 'verify_peer_name' => false, + 'passphrase' => $this->config->getSslPassphrase(), + 'cafile' => $this->config->getSslCafile(), + 'peer_name' => $this->config->getSslPeerName(), + ], $filterFunction), ] ); } @@ -184,7 +140,7 @@ protected function createSocket(string $remoteSocket, $context, ?int &$errno, ?s $remoteSocket, $errno, $errstr, - $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000), + $this->config->getSendTimeoutSec() + ($this->config->getSendTimeoutUsec() / 1000000), STREAM_CLIENT_CONNECT, $context ); @@ -248,7 +204,7 @@ public function readBlocking(int $length): string throw Exception\Socket::invalidLength($length, self::READ_MAX_LENGTH); } - $readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec); + $readable = $this->select([$this->stream], $this->config->getRecvTimeoutSec(), $this->config->getRecvTimeoutUsec()); if ($readable === false) { $this->close(); @@ -279,7 +235,7 @@ public function readBlocking(int $length): string throw Exception\Socket::unexpectedEOF($length); } // Otherwise wait for bytes - $readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec); + $readable = $this->select([$this->stream], $this->config->getRecvTimeoutSec(), $this->config->getRecvTimeoutUsec()); if ($readable !== 1) { throw Exception\Socket::timedOutWithRemainingBytes($length, $remainingBytes); } @@ -310,7 +266,7 @@ public function writeBlocking(string $buffer): int while ($bytesWritten < $bytesToWrite) { // wait for stream to become available for writing - $writable = $this->select([$this->stream], $this->sendTimeoutSec, $this->sendTimeoutUsec, false); + $writable = $this->select([$this->stream], $this->config->getSendTimeoutSec(), $this->config->getSendTimeoutUsec(), false); if ($writable === false) { throw new Exception\Socket('Could not write ' . $bytesToWrite . ' bytes to stream'); diff --git a/src/Config.php b/src/Config.php index bb3827e2..4c38bfc5 100644 --- a/src/Config.php +++ b/src/Config.php @@ -84,11 +84,13 @@ abstract class Config 'metadataRequestTimeoutMs' => 60000, 'metadataRefreshIntervalMs' => 300000, 'metadataMaxAgeMs' => -1, + 'autoCreateTopicsEnable' => true, 'securityProtocol' => self::SECURITY_PROTOCOL_PLAINTEXT, 'sslEnable' => false, // this config item will override, don't config it. 'sslLocalCert' => '', 'sslLocalPk' => '', 'sslVerifyPeer' => false, + 'sslVerifyPeerName' => true, 'sslPassphrase' => '', 'sslCafile' => '', 'sslPeerName' => '', @@ -97,6 +99,10 @@ abstract class Config 'saslPassword' => '', 'saslKeytab' => '', 'saslPrincipal' => '', + 'sendTimeoutSec' => 0, + 'sendTimeoutUsec' => 100000, + 'recvTimeoutSec' => 0, + 'recvTimeoutUsec' => 750000, ]; /** diff --git a/src/Producer/RecordValidator.php b/src/Producer/RecordValidator.php index 22e07cee..f2063843 100644 --- a/src/Producer/RecordValidator.php +++ b/src/Producer/RecordValidator.php @@ -4,6 +4,7 @@ namespace Kafka\Producer; use Kafka\Exception; +use Kafka\ProducerConfig; use function is_string; use function trim; @@ -30,7 +31,9 @@ public function validate(array $record, array $topicList): void throw Exception\InvalidRecordInSet::missingTopic(); } - if (! isset($topicList[$record['topic']])) { + /** @var ProducerConfig $config */ + $config = ProducerConfig::getInstance(); + if (! isset($topicList[$record['topic']]) && ! $config->getAutoCreateTopicsEnable()) { throw Exception\InvalidRecordInSet::nonExististingTopic($record['topic']); } diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 71b1cb06..9a007328 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -28,6 +28,10 @@ class SyncProcess public function __construct(?RecordValidator $recordValidator = null) { $this->recordValidator = $recordValidator ?? new RecordValidator(); + } + + public function init(): void + { $config = $this->getConfig(); \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); @@ -47,6 +51,7 @@ public function __construct(?RecordValidator $recordValidator = null) */ public function send(array $recordSet): array { + $this->init(); $broker = $this->getBroker(); $config = $this->getConfig(); @@ -115,7 +120,7 @@ public function syncMeta(): void } shuffle($brokerHost); - $broker = $this->getBroker(); + $broker = $this->getBroker(); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host, true); @@ -160,7 +165,7 @@ protected function convertRecordSet(array $recordSet): array foreach ($recordSet as $record) { $this->recordValidator->validate($record, $topics); - $topicMeta = $topics[$record['topic']]; + $topicMeta = $this->getTopicMeta($record['topic']); $partNums = array_keys($topicMeta); shuffle($partNums); @@ -193,6 +198,30 @@ protected function convertRecordSet(array $recordSet): array return $sendData; } + /** + * Get the topic meta. If auto create is on, get a random broker instead of + * a random broker that the topic is on. + * + * @param string $topic + * + * @return array + */ + protected function getTopicMeta($topic) + { + $topics = $this->getBroker()->getTopics(); + + if (isset($topics[$topic])) { + return $topics[$topic]; + } + + // Here we can safely assume that auto create topics are set to true. If + // not, and the topic does not exist, the validate of the record would + // have failed. + + // Default for auto creation. + return [0 => 0]; + } + private function getBroker(): Broker { return Broker::getInstance(); diff --git a/tests/Base/Producer/RecordValidatorTest.php b/tests/Base/Producer/RecordValidatorTest.php index 7572213a..0e5e9be1 100644 --- a/tests/Base/Producer/RecordValidatorTest.php +++ b/tests/Base/Producer/RecordValidatorTest.php @@ -6,6 +6,7 @@ use Kafka\Exception\InvalidRecordInSet; use Kafka\Producer\RecordValidator; use PHPUnit\Framework\TestCase; +use Kafka\ProducerConfig; final class RecordValidatorTest extends TestCase { @@ -34,6 +35,9 @@ public function testValidRecordDoesNotThrowException(): void */ public function testInvalidRecordThrowsException(string $expectedExceptionMessage, array $record): void { + $config = ProducerConfig::getInstance(); + $config->setAutoCreateTopicsEnable(false); + $this->expectException(InvalidRecordInSet::class); $this->expectExceptionMessage($expectedExceptionMessage); diff --git a/tests/Base/SocketTest.php b/tests/Base/SocketTest.php index 831df3b4..2f22ef2b 100644 --- a/tests/Base/SocketTest.php +++ b/tests/Base/SocketTest.php @@ -94,24 +94,26 @@ public function testCreateStreamFailure(): void public function testCreateStreamSsl(): void { - $host = '127.0.0.1'; - $port = 9192; - $localCert = $this->root->url() . '/localCert'; - $localKey = $this->root->url() . '/localKey'; - $verifyPeer = false; - $passphrase = '123456'; - $cafile = $this->root->url() . '/cafile'; - $peerName = 'kafka'; + $host = '127.0.0.1'; + $port = 9192; + $localCert = $this->root->url() . '/localCert'; + $localKey = $this->root->url() . '/localKey'; + $verifyPeer = false; + $passphrase = '123456'; + $cafile = $this->root->url() . '/cafile'; + $peerName = 'kafka'; + $verifyPeerName = true; $context = stream_context_create( [ 'ssl' => [ - 'local_cert' => $localCert, - 'local_pk' => $localKey, - 'verify_peer' => $verifyPeer, - 'passphrase' => $passphrase, - 'cafile' => $cafile, - 'peer_name' => $peerName, + 'local_cert' => $localCert, + 'local_pk' => $localKey, + 'verify_peer' => $verifyPeer, + 'passphrase' => $passphrase, + 'cafile' => $cafile, + 'peer_name' => $peerName, + 'verify_peer_name' => $verifyPeerName, ], ] ); @@ -130,6 +132,7 @@ public function testCreateStreamSsl(): void $config->setSslPassphrase($passphrase); $config->setSslVerifyPeer($verifyPeer); $config->setSslPeerName($peerName); + $config->setSslVerifyPeerName($verifyPeerName); $sasl = $this->createMock(SaslMechanism::class); $sasl->expects($this->once()) @@ -260,9 +263,11 @@ public function testRecvTimeout(): void $streamMock->method('eof')->willReturn(false); $streamMock->method('read')->willReturn('xxxx'); - $socket = $this->mockStreamSocketClient($host, $port, null, null, ['select']); - $socket->setRecvTimeoutSec(3000); - $socket->setRecvTimeoutUsec(30001); + $config = $this->getMockForAbstractClass(Config::class); + $config->setRecvTimeoutSec(30000); + $config->setRecvTimeoutUsec(30001); + + $socket = $this->mockStreamSocketClient($host, $port, $config, null, ['select']); $socket->method('select') ->with($this->isType('array'), 3000, 30001, true) @@ -367,9 +372,11 @@ public function testSendTimeout(): void $streamMock->method('eof')->willReturn(false); $streamMock->method('write')->willReturn(4); - $socket = $this->mockStreamSocketClient($host, $port, null, null, ['select']); - $socket->setSendTimeoutSec(3000); - $socket->setSendTimeoutUsec(30001); + $config = $this->getMockForAbstractClass(Config::class); + $config->setSendTimeoutSec(30000); + $config->setSendTimeoutUsec(30001); + + $socket = $this->mockStreamSocketClient($host, $port, $config, null, ['select']); $socket->method('select') ->with($this->isType('array'), 3000, 30001, false) @@ -392,6 +399,10 @@ private function mockStreamSocketClient( ?SaslMechanism $sasl = null, array $mockMethod = [] ): Socket { + if ($config === null) { + $config = $this->getMockForAbstractClass(Config::class); + } + $socket = $this->getMockBuilder(Socket::class) ->setMethods(array_merge(['createSocket'], $mockMethod)) ->setConstructorArgs([$host, $port, $config, $sasl])