Skip to content
This repository was archived by the owner on Dec 27, 2023. It is now read-only.

Commit bd5095b

Browse files
authored
Merge pull request #16 from bstoney/master
Merge "Fixed zmq reply messaging", from @bstoney
2 parents c229d47 + 38163c6 commit bd5095b

File tree

9 files changed

+128
-67
lines changed

9 files changed

+128
-67
lines changed

src/Actions/Action.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414

1515
interface Action
1616
{
17-
public function call(array $header, array $content);
17+
public function call(array $header, array $content, $zmqId = null);
1818
}

src/Actions/ExecuteAction.php

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@
1111

1212
namespace Litipk\JupyterPHP\Actions;
1313

14-
1514
use Litipk\JupyterPHP\JupyterBroker;
1615
use Psy\Exception\BreakException;
1716
use Psy\Exception\ThrowUpException;
1817
use Psy\ExecutionLoop\Loop;
1918
use Psy\Shell;
2019
use React\ZMQ\SocketWrapper;
2120

22-
2321
final class ExecuteAction implements Action
2422
{
2523
/** @var JupyterBroker */
@@ -39,45 +37,68 @@ final class ExecuteAction implements Action
3937

4038
/** @var string */
4139
private $code;
42-
40+
41+
/** @var bool */
42+
private $silent;
43+
4344
/** @var int */
44-
private $execCount;
45+
private $execCount = 0;
4546

4647

4748
public function __construct(
48-
JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket, Shell $shellSoul
49-
)
50-
{
49+
JupyterBroker $broker,
50+
SocketWrapper $iopubSocket,
51+
SocketWrapper $shellSocket,
52+
Shell $shellSoul
53+
) {
5154
$this->broker = $broker;
5255
$this->iopubSocket = $iopubSocket;
5356
$this->shellSocket = $shellSocket;
5457
$this->shellSoul = $shellSoul;
5558
}
5659

57-
public function call(array $header, array $content)
60+
public function call(array $header, array $content, $zmqId = null)
5861
{
59-
$this->broker->send(
60-
$this->iopubSocket, 'status', ['execution_state' => 'busy'], $header
61-
);
62+
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);
6263

6364
$this->header = $header;
64-
$this->execCount = isset($content->execution_count) ? $content->execution_count : 0;
6565
$this->code = $content['code'];
66+
$this->silent = $content['silent'];
67+
68+
if (!$this->silent) {
69+
$this->execCount = $this->execCount + 1;
70+
71+
$this->broker->send(
72+
$this->iopubSocket,
73+
'execute_input',
74+
['code' => $this->code, 'execution_count' => $this->execCount],
75+
$this->header
76+
);
77+
}
6678

6779
($this->getClosure())();
80+
81+
$replyContent = [
82+
'status' => 'ok',
83+
'execution_count' => $this->execCount,
84+
'payload' => [],
85+
'user_expressions' => new \stdClass
86+
];
87+
88+
$this->broker->send($this->shellSocket, 'execute_reply', $replyContent, $this->header, [], $zmqId);
89+
90+
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $this->header);
6891
}
6992

7093
public function notifyMessage(string $message)
7194
{
72-
$this->broker->send($this->shellSocket, 'execute_reply', ['status' => 'ok'], $this->header);
73-
$this->broker->send($this->iopubSocket, 'stream', ['name' => 'stdout', 'data' => $message], $this->header);
95+
$this->broker->send($this->iopubSocket, 'stream', ['name' => 'stdout', 'text' => $message], $this->header);
7496
$this->broker->send(
7597
$this->iopubSocket,
7698
'execute_result',
77-
['execution_count' => $this->execCount + 1, 'data' => $message, 'metadata' => new \stdClass],
99+
['execution_count' => $this->execCount, 'data' => ['text/plain' => $message], 'metadata' => new \stdClass],
78100
$this->header
79101
);
80-
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $this->header);
81102
}
82103

83104
private function getClosure(): callable

src/Actions/HistoryAction.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket)
3131
$this->shellSocket = $shellSocket;
3232
}
3333

34-
public function call(array $header, array $content)
34+
public function call(array $header, array $content, $zmqId = null)
3535
{
36-
$this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header);
36+
$this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header, [], $zmqId);
3737
}
3838
}

src/Actions/KernelInfoAction.php

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@
1111

1212
namespace Litipk\JupyterPHP\Actions;
1313

14-
1514
use Litipk\JupyterPHP\JupyterBroker;
1615
use React\ZMQ\SocketWrapper;
1716

18-
1917
final class KernelInfoAction implements Action
2018
{
2119
/** @var JupyterBroker */
@@ -35,32 +33,30 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket, S
3533
$this->iopubSocket = $iopubSocket;
3634
}
3735

38-
public function call(array $header, array $content)
36+
public function call(array $header, array $content, $zmqId = null)
3937
{
40-
// TODO: Implement call() method.
38+
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);
4139

42-
$this->broker->send(
43-
$this->iopubSocket, 'status', ['execution_state' => 'busy'], $header
44-
);
45-
4640
$this->broker->send(
4741
$this->shellSocket,
4842
'kernel_info_reply',
4943
[
50-
'protocol_version' => '5.0.0',
44+
'protocol_version' => '5.0',
5145
'implementation' => 'jupyter-php',
5246
'implementation_version' => '0.1.0',
5347
'banner' => 'Jupyter-PHP Kernel',
54-
'language' => 'PHP',
55-
'language_version' => phpversion(),
5648
'language_info' => [
5749
'name' => 'PHP',
5850
'version' => phpversion(),
5951
'mimetype' => 'text/x-php',
6052
'file_extension' => '.php',
61-
'pygments_lexer' => 'PHP'
62-
]
63-
]
53+
'pygments_lexer' => 'PHP',
54+
],
55+
'status' => 'ok',
56+
],
57+
$header,
58+
[],
59+
$zmqId
6460
);
6561

6662
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header);

src/Actions/ShutdownAction.php

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,34 @@
1111

1212
namespace Litipk\JupyterPHP\Actions;
1313

14+
use Litipk\JupyterPHP\JupyterBroker;
15+
use React\ZMQ\SocketWrapper;
1416

1517
final class ShutdownAction implements Action
1618
{
17-
public function call(array $header, array $content)
19+
/** @var JupyterBroker */
20+
private $broker;
21+
22+
/** @var SocketWrapper */
23+
private $shellSocket;
24+
25+
/** @var SocketWrapper */
26+
private $iopubSocket;
27+
28+
public function __construct(JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket)
29+
{
30+
$this->broker = $broker;
31+
$this->iopubSocket = $iopubSocket;
32+
$this->shellSocket = $shellSocket;
33+
}
34+
35+
public function call(array $header, array $content, $zmqId = null)
1836
{
19-
// TODO: Implement call() method.
37+
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);
38+
39+
$replyContent = ['restart' => $content['restart']];
40+
$this->broker->send($this->shellSocket, 'shutdown_reply', $replyContent, $header, [], $zmqId);
41+
42+
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header);
2043
}
2144
}

src/Handlers/HbMessagesHandler.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,32 @@
1111

1212
namespace Litipk\JupyterPHP\Handlers;
1313

14-
14+
use Litipk\JupyterPHP\JupyterBroker;
1515
use Monolog\Logger;
16-
16+
use React\ZMQ\SocketWrapper;
1717

1818
final class HbMessagesHandler
1919
{
2020
/** @var Logger */
2121
private $logger;
2222

23-
public function __construct(Logger $logger)
23+
/** @var SocketWrapper */
24+
private $hbSocket;
25+
26+
public function __construct(SocketWrapper $hbSocket, Logger $logger)
2427
{
2528
$this->logger = $logger;
29+
$this->hbSocket = $hbSocket;
2630
}
2731

2832
public function __invoke($msg)
2933
{
3034
$this->logger->debug('Received message', ['processId' => getmypid(), 'msg' => $msg]);
35+
36+
if (['ping'] == $msg) {
37+
$this->hbSocket->send($msg);
38+
} else {
39+
$this->logger->error('Unknown message', ['processId' => getmypid(), 'msg' => $msg]);
40+
}
3141
}
32-
}
42+
}

src/Handlers/ShellMessagesHandler.php

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
namespace Litipk\JupyterPHP\Handlers;
1313

14-
1514
use Litipk\JupyterPHP\Actions\ExecuteAction;
1615
use Litipk\JupyterPHP\Actions\HistoryAction;
1716
use Litipk\JupyterPHP\Actions\KernelInfoAction;
@@ -23,7 +22,6 @@
2322
use Psy\Shell;
2423
use React\ZMQ\SocketWrapper;
2524

26-
2725
final class ShellMessagesHandler
2826
{
2927
/** @var ExecuteAction */
@@ -46,23 +44,25 @@ final class ShellMessagesHandler
4644

4745

4846
public function __construct(
49-
JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket, Logger $logger
50-
)
51-
{
47+
JupyterBroker $broker,
48+
SocketWrapper $iopubSocket,
49+
SocketWrapper $shellSocket,
50+
Logger $logger
51+
) {
5252
$this->shellSoul = new Shell();
53-
53+
5454
$this->executeAction = new ExecuteAction($broker, $iopubSocket, $shellSocket, $this->shellSoul);
5555
$this->historyAction = new HistoryAction($broker, $shellSocket);
5656
$this->kernelInfoAction = new KernelInfoAction($broker, $shellSocket, $iopubSocket);
57-
$this->shutdownAction = new ShutdownAction($broker, $shellSocket);
58-
57+
$this->shutdownAction = new ShutdownAction($broker, $iopubSocket, $shellSocket);
58+
5959
$this->logger = $logger;
6060

6161
$broker->send(
6262
$iopubSocket, 'status', ['execution_state' => 'starting'], []
6363
);
6464

65-
$this->shellSoul->setOutput( new KernelOutput($this->executeAction, $this->logger->withName('KernelOutput')));
65+
$this->shellSoul->setOutput(new KernelOutput($this->executeAction, $this->logger->withName('KernelOutput')));
6666
}
6767

6868
public function __invoke(array $msg)
@@ -73,24 +73,24 @@ public function __invoke(array $msg)
7373
$content = json_decode($content, true);
7474

7575
$this->logger->debug('Received message', [
76-
'processId' => getmypid(),
77-
'zmqId' => $zmqId,
78-
'delim' => $delim,
79-
'hmac' => $hmac,
80-
'header' => $header,
76+
'processId' => getmypid(),
77+
'zmqId' => htmlentities($zmqId, ENT_COMPAT, "UTF-8"),
78+
'delim' => $delim,
79+
'hmac' => $hmac,
80+
'header' => $header,
8181
'parentHeader' => $parentHeader,
82-
'metadata' => $metadata,
83-
'content' => $content
82+
'metadata' => $metadata,
83+
'content' => $content
8484
]);
8585

8686
if ('kernel_info_request' === $header['msg_type']) {
87-
$this->kernelInfoAction->call($header, $content);
87+
$this->kernelInfoAction->call($header, $content, $zmqId);
8888
} elseif ('execute_request' === $header['msg_type']) {
89-
$this->executeAction->call($header, $content);
89+
$this->executeAction->call($header, $content, $zmqId);
9090
} elseif ('history_request' === $header['msg_type']) {
91-
$this->historyAction->call($header, $content);
91+
$this->historyAction->call($header, $content, $zmqId);
9292
} elseif ('shutdown_request' === $header['msg_type']) {
93-
$this->shutdownAction->call($header, $content);
93+
$this->shutdownAction->call($header, $content, $zmqId);
9494
} elseif ('comm_open' === $header['msg_type']) {
9595
// TODO: Research about what should be done.
9696
} else {

src/JupyterBroker.php

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@ public function __construct($key, $signatureScheme, UuidInterface $sessionId, Lo
4646
}
4747

4848
public function send(
49-
SocketWrapper $stream, $msgType, array $content = [], array $parentHeader = [], array $metadata = []
50-
)
51-
{
49+
SocketWrapper $stream,
50+
$msgType,
51+
array $content = [],
52+
array $parentHeader = [],
53+
array $metadata = [],
54+
$zmqId = null
55+
) {
5256
$header = $this->createHeader($msgType);
5357

5458
$msgDef = [
@@ -58,10 +62,16 @@ public function send(
5862
json_encode(empty($content) ? new \stdClass : $content),
5963
];
6064

65+
if (null !== $zmqId) {
66+
$finalMsg = [$zmqId];
67+
} else {
68+
$finalMsg = [];
69+
}
70+
6171
$finalMsg = array_merge(
72+
$finalMsg,
6273
['<IDS|MSG>', $this->sign($msgDef)],
63-
$msgDef
64-
);
74+
$msgDef);
6575

6676
if (null !== $this->logger) {
6777
$this->logger->debug('Sent message', ['processId' => getmypid(), 'message' => $finalMsg]);
@@ -78,6 +88,7 @@ private function createHeader(string $msgType): array
7888
'username' => "kernel",
7989
'session' => $this->sessionId->toString(),
8090
'msg_type' => $msgType,
91+
'version' => '5.0',
8192
];
8293
}
8394

0 commit comments

Comments
 (0)