10
10
*/
11
11
class ProcessManager
12
12
{
13
+ /**
14
+ * Threads count environment variable name
15
+ */
16
+ const THREADS_COUNT = 'MAGE_INDEXER_THREADS_COUNT ' ;
17
+
13
18
/** @var bool */
14
19
private $ failInChildProcess = false ;
15
20
16
21
/** @var \Magento\Framework\App\ResourceConnection */
17
22
private $ resource ;
18
23
24
+ /** @var int|null */
25
+ private $ threadsCount ;
26
+
19
27
/**
20
28
* @param \Magento\Framework\App\ResourceConnection $resource
29
+ * @param int|null $threadsCount
21
30
*/
22
31
public function __construct (
23
- \Magento \Framework \App \ResourceConnection $ resource = null
32
+ \Magento \Framework \App \ResourceConnection $ resource = null ,
33
+ int $ threadsCount = null
24
34
) {
25
35
if (null === $ resource ) {
26
36
$ resource = \Magento \Framework \App \ObjectManager::getInstance ()->get (
27
37
\Magento \Framework \App \ResourceConnection::class
28
38
);
29
39
}
30
40
$ this ->resource = $ resource ;
41
+ $ this ->threadsCount = (int )$ threadsCount ;
31
42
}
32
43
33
44
/**
34
45
* Execute user functions
35
46
*
36
47
* @param \Traversable $userFunctions
37
- * @param int $threadsCount
38
48
*/
39
- public function execute ($ userFunctions, $ threadsCount )
49
+ public function execute ($ userFunctions )
40
50
{
41
- if ($ threadsCount > 1 && $ this ->isCanBeParalleled ()) {
42
- $ this ->multiThreadsExecute ($ userFunctions, $ threadsCount );
51
+ if ($ this -> threadsCount > 1 && $ this ->isCanBeParalleled ()) {
52
+ $ this ->multiThreadsExecute ($ userFunctions );
43
53
} else {
44
54
$ this ->simpleThreadExecute ($ userFunctions );
45
55
}
@@ -61,9 +71,8 @@ private function simpleThreadExecute($userFunctions)
61
71
* Execute user functions in in multiThreads mode
62
72
*
63
73
* @param \Traversable $userFunctions
64
- * @param int $threadsCount
65
74
*/
66
- private function multiThreadsExecute ($ userFunctions, $ threadsCount )
75
+ private function multiThreadsExecute ($ userFunctions )
67
76
{
68
77
$ this ->resource ->closeConnection (null );
69
78
$ threadNumber = 0 ;
@@ -72,7 +81,7 @@ private function multiThreadsExecute($userFunctions, $threadsCount)
72
81
if ($ pid == -1 ) {
73
82
throw new \RuntimeException ('Unable to fork a new process ' );
74
83
} elseif ($ pid ) {
75
- $ this ->executeParentProcess ($ threadNumber, $ threadsCount );
84
+ $ this ->executeParentProcess ($ threadNumber );
76
85
} else {
77
86
$ this ->startChildProcess ($ userFunction );
78
87
}
@@ -111,12 +120,11 @@ private function startChildProcess($userFunction)
111
120
* Execute parent process
112
121
*
113
122
* @param int $threadNumber
114
- * @param int $threadsCount
115
123
*/
116
- private function executeParentProcess (&$ threadNumber, $ threadsCount )
124
+ private function executeParentProcess (&$ threadNumber )
117
125
{
118
126
$ threadNumber ++;
119
- if ($ threadNumber >= $ threadsCount ) {
127
+ if ($ threadNumber >= $ this -> threadsCount ) {
120
128
pcntl_wait ($ status );
121
129
if (pcntl_wexitstatus ($ status ) !== 0 ) {
122
130
$ this ->failInChildProcess = true ;
0 commit comments