@@ -110,11 +110,11 @@ public function execute(string $endpoint, string $path, array $options)
110
110
$ readTimeout = (int )($ options ["-read-timeout " ] ?? Defaults::READ_TIMEOUT );
111
111
$ writeForks = ((int )($ options ["-write-rps " ] ?? Defaults::WRITE_RPS )) / Defaults::RPS_PER_WRITE_FORK ;
112
112
$ writeTimeout = (int )($ options ["-write-timeout " ] ?? Defaults::WRITE_TIMEOUT );
113
- $ time = (int )($ options ["-time " ] ?? Defaults::READ_TIME ) - 5 ;
113
+ $ time = (int )($ options ["-time " ] ?? Defaults::READ_TIME );
114
114
$ shutdownTime = (int )($ options ["-shutdown-time " ] ?? Defaults::SHUTDOWN_TIME );
115
115
116
116
$ this ->queueId = ftok (__FILE__ , 'm ' );
117
- $ msgQueue = msg_get_queue ($ this ->queueId );
117
+ msg_remove_queue ( msg_get_queue ($ this ->queueId ) );
118
118
119
119
$ pIds = [];
120
120
@@ -128,8 +128,8 @@ public function execute(string $endpoint, string $path, array $options)
128
128
}, $ readForks );
129
129
$ pIds = array_merge ($ pIds , $ readPIds );
130
130
131
- $ writePIds = $ this ->forkJob (function (int $ i ) use ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ readTimeout , $ shutdownTime , $ startTime ) {
132
- $ this ->writeJob ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ readTimeout , $ i , $ shutdownTime , $ startTime );
131
+ $ writePIds = $ this ->forkJob (function (int $ i ) use ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ writeTimeout , $ shutdownTime , $ startTime ) {
132
+ $ this ->writeJob ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ writeTimeout , $ i , $ shutdownTime , $ startTime );
133
133
}, $ writeForks );
134
134
$ pIds = array_merge ($ pIds , $ writePIds );
135
135
@@ -209,13 +209,13 @@ protected function readJob(string $endpoint, string $path, $tableName, int $init
209
209
Utils::retriedError ($ this ->queueId , 'write ' , get_class ($ e ));
210
210
}
211
211
]);
212
- Utils::metricDone ("read " , $ this ->queueId , $ attemps , $ this ->getLatency ($ begin ));
212
+ Utils::metricDone ("read " , $ this ->queueId , $ attemps , $ this ->getLatencyMilliseconds ($ begin ));
213
213
} catch (\Exception $ e ) {
214
214
$ table ->getLogger ()->error ($ e ->getMessage ());
215
- Utils::metricFail ("read " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatency ($ begin ));
215
+ Utils::metricFail ("read " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatencyMilliseconds ($ begin ));
216
216
} finally {
217
217
$ i ++;
218
- $ delay = $ this ->getDelay ($ startTime , Defaults::RPS_PER_READ_FORK , $ i );
218
+ $ delay = $ this ->getDelayMicroseconds ($ startTime , Defaults::RPS_PER_READ_FORK , $ i );
219
219
usleep ($ delay > 0 ? $ delay : 1 );
220
220
}
221
221
}
@@ -244,13 +244,13 @@ protected function writeJob(string $endpoint, string $path, $tableName, int $ini
244
244
Utils::retriedError ($ this ->queueId , 'write ' , get_class ($ e ));
245
245
}
246
246
]);
247
- Utils::metricDone ("write " , $ this ->queueId , $ attemps , $ this ->getLatency ($ begin ));
247
+ Utils::metricDone ("write " , $ this ->queueId , $ attemps , $ this ->getLatencyMilliseconds ($ begin ));
248
248
} catch (\Exception $ e ) {
249
249
$ table ->getLogger ()->error ($ e ->getMessage ());
250
- Utils::metricFail ("write " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatency ($ begin ));
250
+ Utils::metricFail ("write " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatencyMilliseconds ($ begin ));
251
251
} finally {
252
252
$ i ++;
253
- $ delay = $ this ->getDelay ($ startTime , Defaults::RPS_PER_WRITE_FORK , $ i );
253
+ $ delay = $ this ->getDelayMicroseconds ($ startTime , Defaults::RPS_PER_WRITE_FORK , $ i );
254
254
usleep ($ delay > 0 ? $ delay : 1 );
255
255
}
256
256
}
@@ -288,7 +288,7 @@ protected function metricsJob(int $reportPeriod, float $startTime, int $time, st
288
288
]);
289
289
290
290
while (microtime (true ) <= $ startTime + $ time ) {
291
- msg_receive ($ msgQueue , Utils::MSG_TYPE , $ msgType , 1024 , $ message );
291
+ msg_receive ($ msgQueue , Utils::MSG_TYPE , $ msgType , Utils:: MESSAGE_SIZE_LIMIT_BYTES , $ message );
292
292
$ queryLatencies ->observe ($ this ->getLatency ($ message ["sent " ]));
293
293
switch ($ message ['type ' ]) {
294
294
case 'reset ' :
@@ -325,14 +325,15 @@ protected function metricsJob(int $reportPeriod, float $startTime, int $time, st
325
325
$ lastPushTime = microtime (true );
326
326
}
327
327
}
328
+ msg_remove_queue ($ msgQueue );
328
329
}
329
330
330
- protected function getLatency ( $ begin )
331
+ protected function getLatencyMilliseconds ( float $ begin ): float
331
332
{
332
333
return (microtime (true ) - $ begin ) * 1000 ;
333
334
}
334
335
335
- protected function getDelay (float $ startTime , int $ rps , int $ i )
336
+ protected function getDelayMicroseconds (float $ startTime , int $ rps , int $ i ): float
336
337
{
337
338
return $ startTime * 1000000 + $ i * 1000000 / $ rps - microtime (true ) * 1000000 ;
338
339
}
0 commit comments