-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
See https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/ and https://github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.md
POC
Implements zlib compression (compressor ID 2):
<?php namespace com\mongodb\io;
class Compression {
public $id;
public function exclude($sections) {
return (
isset($sections['hello']) ||
isset($sections['isMaster']) ||
isset($sections['saslStart']) ||
isset($sections['saslContinue']) ||
isset($sections['getnonce']) ||
isset($sections['authenticate']) ||
isset($sections['createUser']) ||
isset($sections['updateUser']) ||
isset($sections['copydbSaslStart']) ||
isset($sections['copydbgetnonce']) ||
isset($sections['copydb'])
);
}
public static function negotiate($server, $options= []) {
if (in_array('zlib', $server)) {
return new class(2, $options['zlibCompressionLevel'] ?? -1) extends Compression {
public function __construct($id, $level) {
$this->id= $id;
$this->level= $level;
}
public function compress($data) {
return gzcompress($data, $this->level);
}
};
}
return null;
}
}
diff --git a/src/main/php/com/mongodb/io/Connection.class.php b/src/main/php/com/mongodb/io/Connection.class.php
index fa45abd..49dd693 100755
--- a/src/main/php/com/mongodb/io/Connection.class.php
+++ b/src/main/php/com/mongodb/io/Connection.class.php
@@ -30,7 +30,7 @@ class Connection {
const CONNECT_TIMEOUT = 40000;
const READ_TIMEOUT = 60000;
- private $socket, $bson;
+ private $socket, $bson, $compression;
private $packet= 1;
public $server= null;
public $lastUsed= null;
@@ -100,6 +100,11 @@ class Connection {
],
];
+ // See https://github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.md
+ if ($compressors= ($options['params']['compressors'] ?? null)) {
+ $params['compression']= explode(',', $compressors);
+ }
+
// If the optional field saslSupportedMechs is specified, the command also returns
// an array of SASL mechanisms used to create the specified user's credentials.
if (isset($options['user'])) {
@@ -113,6 +118,7 @@ class Connection {
try {
$this->server= $this->hello($params);
+ $this->compression= Compression::negotiate($this->server['compression'] ?? [], $options['params']);
} catch (ProtocolException $e) {
throw new ConnectException('Server handshake failed @ '.$this->address(), $e);
}
@@ -230,14 +236,29 @@ class Connection {
$this->packet > 2147483647 ? $this->packet= 1 : $this->packet++;
$body= $header.$this->bson->sections($sections);
- $payload= pack('VVVV', strlen($body) + 16, $this->packet, 0, $operation).$body;
- $this->socket->write($payload);
+ if (null === $this->compression || $this->compression->exclude($sections)) {
+ $this->socket->write(pack('VVVV', strlen($body) + 16, $this->packet, 0, $operation).$body);
+ } else {
+ $compressed= $this->compression->compress($body);
+ $this->socket->write(pack(
+ 'VVVVVVCa*',
+ strlen($compressed) + 25,
+ $this->packet,
+ 0,
+ self::OP_COMPRESSED,
+ $operation,
+ strlen($body),
+ $this->compression->id,
+ $compressed
+ ));
+ }
+
$meta= unpack('VmessageLength/VrequestID/VresponseTo/VopCode', $this->read0(16));
$response= $this->read0($meta['messageLength'] - 16);
$this->lastUsed= time();
- switch ($meta['opCode']) {
+ opcode: switch ($meta['opCode']) {
case self::OP_MSG:
$flags= unpack('V', $response, 4)[1];
if ("\x00" === $response[4]) {
@@ -258,6 +279,16 @@ class Connection {
return $reply;
+ case self::OP_COMPRESSED:
+ $compressed= unpack('VoriginalOpcode/VuncompressedSize/CcompressorId', $response);
+ if (2 === $compressed['compressorId']) {
+ $response= gzuncompress(substr($response, 9));
+ $meta['opCode']= $compressed['originalOpcode'];
+ goto opcode;
+ }
+
+ throw new ProtocolException('Unsupported compressorId '.$compressed['compressorId']);
+
default:
return ['opCode' => $meta['opCode']];
}
Metadata
Metadata
Assignees
Labels
No labels