Skip to content

Support compression #61

@thekid

Description

@thekid

See https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/ and https://github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.md

POC

Implements zlib compression (compressor ID 2):

<?php namespace com\mongodb\io;

class Compression {
  public $id;

  public function exclude($sections) {
    return (
      isset($sections['hello']) ||
      isset($sections['isMaster']) ||
      isset($sections['saslStart']) ||
      isset($sections['saslContinue']) ||
      isset($sections['getnonce']) ||
      isset($sections['authenticate']) ||
      isset($sections['createUser']) ||
      isset($sections['updateUser']) ||
      isset($sections['copydbSaslStart']) ||
      isset($sections['copydbgetnonce']) ||
      isset($sections['copydb'])
    );
  }

  public static function negotiate($server, $options= []) {
    if (in_array('zlib', $server)) {
      return new class(2, $options['zlibCompressionLevel'] ?? -1) extends Compression {

        public function __construct($id, $level) {
          $this->id= $id;
          $this->level= $level;
        }

        public function compress($data) {
          return gzcompress($data, $this->level);
        }
      };
    }

    return null;
  }
}
diff --git a/src/main/php/com/mongodb/io/Connection.class.php b/src/main/php/com/mongodb/io/Connection.class.php
index fa45abd..49dd693 100755
--- a/src/main/php/com/mongodb/io/Connection.class.php
+++ b/src/main/php/com/mongodb/io/Connection.class.php
@@ -30,7 +30,7 @@ class Connection {
   const CONNECT_TIMEOUT = 40000;
   const READ_TIMEOUT    = 60000;
 
-  private $socket, $bson;
+  private $socket, $bson, $compression;
   private $packet= 1;
   public $server= null;
   public $lastUsed= null;
@@ -100,6 +100,11 @@ class Connection {
       ],
     ];
 
+    // See https://github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.md
+    if ($compressors= ($options['params']['compressors'] ?? null)) {
+      $params['compression']= explode(',', $compressors);
+    }
+
     // If the optional field saslSupportedMechs is specified, the command also returns
     // an array of SASL mechanisms used to create the specified user's credentials.
     if (isset($options['user'])) {
@@ -113,6 +118,7 @@ class Connection {
 
     try {
       $this->server= $this->hello($params);
+      $this->compression= Compression::negotiate($this->server['compression'] ?? [], $options['params']);
     } catch (ProtocolException $e) {
       throw new ConnectException('Server handshake failed @ '.$this->address(), $e);
     }
@@ -230,14 +236,29 @@ class Connection {
 
     $this->packet > 2147483647 ? $this->packet= 1 : $this->packet++;
     $body= $header.$this->bson->sections($sections);
-    $payload= pack('VVVV', strlen($body) + 16, $this->packet, 0, $operation).$body;
 
-    $this->socket->write($payload);
+    if (null === $this->compression || $this->compression->exclude($sections)) {
+      $this->socket->write(pack('VVVV', strlen($body) + 16, $this->packet, 0, $operation).$body);
+    } else {
+      $compressed= $this->compression->compress($body);
+      $this->socket->write(pack(
+        'VVVVVVCa*',
+        strlen($compressed) + 25,
+        $this->packet,
+        0,
+        self::OP_COMPRESSED,
+        $operation,
+        strlen($body),
+        $this->compression->id,
+        $compressed
+      ));
+    }
+
     $meta= unpack('VmessageLength/VrequestID/VresponseTo/VopCode', $this->read0(16));
     $response= $this->read0($meta['messageLength'] - 16);
     $this->lastUsed= time();
 
-    switch ($meta['opCode']) {
+    opcode: switch ($meta['opCode']) {
       case self::OP_MSG:
         $flags= unpack('V', $response, 4)[1];
         if ("\x00" === $response[4]) {
@@ -258,6 +279,16 @@ class Connection {
 
         return $reply;
 
+      case self::OP_COMPRESSED:
+        $compressed= unpack('VoriginalOpcode/VuncompressedSize/CcompressorId', $response);
+        if (2 === $compressed['compressorId']) {
+          $response= gzuncompress(substr($response, 9));
+          $meta['opCode']= $compressed['originalOpcode'];
+          goto opcode;
+        }
+
+        throw new ProtocolException('Unsupported compressorId '.$compressed['compressorId']);
+
       default:
         return ['opCode' => $meta['opCode']];
     }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions