Skip to content

Commit 124d8fe

Browse files
authored
Merge pull request #2229 from magento-performance/MAGETWO-80789
Story: - MAGETWO-80789 [Indexer] Search Indexer is scoped & multi-threaded
2 parents ac1e74c + c87d366 commit 124d8fe

File tree

12 files changed

+231
-75
lines changed

12 files changed

+231
-75
lines changed

app/code/Magento/CatalogSearch/Model/Indexer/Fulltext.php

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Magento\Framework\Search\Request\Config as SearchRequestConfig;
1313
use Magento\Framework\Search\Request\DimensionFactory;
1414
use Magento\Store\Model\StoreManagerInterface;
15+
use Magento\Indexer\Model\ProcessManager;
1516

1617
/**
1718
* Provide functionality for Fulltext Search indexing.
@@ -71,6 +72,11 @@ class Fulltext implements \Magento\Framework\Indexer\ActionInterface, \Magento\F
7172
*/
7273
private $indexScopeState;
7374

75+
/**
76+
* @var ProcessManager
77+
*/
78+
private $processManager;
79+
7480
/**
7581
* @param FullFactory $fullActionFactory
7682
* @param IndexerHandlerFactory $indexerHandlerFactory
@@ -81,6 +87,8 @@ class Fulltext implements \Magento\Framework\Indexer\ActionInterface, \Magento\F
8187
* @param array $data
8288
* @param IndexSwitcherInterface $indexSwitcher
8389
* @param Scope\State $indexScopeState
90+
* @param ProcessManager $processManager
91+
* @SuppressWarnings(PHPMD.ExcessiveParameterList)
8492
*/
8593
public function __construct(
8694
FullFactory $fullActionFactory,
@@ -91,7 +99,8 @@ public function __construct(
9199
SearchRequestConfig $searchRequestConfig,
92100
array $data,
93101
IndexSwitcherInterface $indexSwitcher = null,
94-
State $indexScopeState = null
102+
State $indexScopeState = null,
103+
ProcessManager $processManager = null
95104
) {
96105
$this->fullAction = $fullActionFactory->create(['data' => $data]);
97106
$this->indexerHandlerFactory = $indexerHandlerFactory;
@@ -106,8 +115,12 @@ public function __construct(
106115
if (null === $indexScopeState) {
107116
$indexScopeState = ObjectManager::getInstance()->get(State::class);
108117
}
118+
if (null === $processManager) {
119+
$processManager = ObjectManager::getInstance()->get(ProcessManager::class);
120+
}
109121
$this->indexSwitcher = $indexSwitcher;
110122
$this->indexScopeState = $indexScopeState;
123+
$this->processManager = $processManager;
111124
}
112125

113126
/**
@@ -140,20 +153,16 @@ public function execute($ids)
140153
public function executeFull()
141154
{
142155
$storeIds = array_keys($this->storeManager->getStores());
143-
/** @var IndexerHandler $saveHandler */
144-
$saveHandler = $this->indexerHandlerFactory->create([
145-
'data' => $this->data
146-
]);
156+
157+
$userFunctions = [];
147158
foreach ($storeIds as $storeId) {
148-
$dimensions = [$this->dimensionFactory->create(['name' => 'scope', 'value' => $storeId])];
149-
$this->indexScopeState->useTemporaryIndex();
159+
$userFunctions[$storeId] = function () use ($storeId) {
160+
return $this->executeFullByStore($storeId);
161+
};
162+
}
150163

151-
$saveHandler->cleanIndex($dimensions);
152-
$saveHandler->saveIndex($dimensions, $this->fullAction->rebuildStoreIndex($storeId));
164+
$this->processManager->execute($userFunctions);
153165

154-
$this->indexSwitcher->switchIndex($dimensions);
155-
$this->indexScopeState->useRegularIndex();
156-
}
157166
$this->fulltextResource->resetSearchResults();
158167
$this->searchRequestConfig->reset();
159168
}
@@ -179,4 +188,26 @@ public function executeRow($id)
179188
{
180189
$this->execute([$id]);
181190
}
191+
192+
/**
193+
* Execute full indexation by storeID
194+
*
195+
* @param int $storeId
196+
*/
197+
private function executeFullByStore($storeId)
198+
{
199+
/** @var IndexerHandler $saveHandler */
200+
$saveHandler = $this->indexerHandlerFactory->create([
201+
'data' => $this->data
202+
]);
203+
204+
$dimensions = [$this->dimensionFactory->create(['name' => 'scope', 'value' => $storeId])];
205+
$this->indexScopeState->useTemporaryIndex();
206+
207+
$saveHandler->cleanIndex($dimensions);
208+
$saveHandler->saveIndex($dimensions, $this->fullAction->rebuildStoreIndex($storeId));
209+
210+
$this->indexSwitcher->switchIndex($dimensions);
211+
$this->indexScopeState->useRegularIndex();
212+
}
182213
}

app/code/Magento/CatalogSearch/Test/Unit/Model/Indexer/FulltextTest.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ class FulltextTest extends \PHPUnit\Framework\TestCase
5454
*/
5555
private $indexSwitcher;
5656

57+
/**
58+
* @var \Magento\Indexer\Model\ProcessManager
59+
*/
60+
private $processManager;
61+
5762
protected function setUp()
5863
{
5964
$this->fullAction = $this->getClassMock(\Magento\CatalogSearch\Model\Indexer\Fulltext\Action\Full::class);
@@ -89,6 +94,8 @@ protected function setUp()
8994
->setMethods(['switchIndex'])
9095
->getMock();
9196

97+
$this->processManager = new \Magento\Indexer\Model\ProcessManager();
98+
9299
$objectManagerHelper = new ObjectManagerHelper($this);
93100
$this->model = $objectManagerHelper->getObject(
94101
\Magento\CatalogSearch\Model\Indexer\Fulltext::class,
@@ -101,6 +108,7 @@ protected function setUp()
101108
'searchRequestConfig' => $this->searchRequestConfig,
102109
'data' => [],
103110
'indexSwitcher' => $this->indexSwitcher,
111+
'processManager' => $this->processManager,
104112
]
105113
);
106114
}

app/code/Magento/CatalogSearch/composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"php": "7.0.2|7.0.4|~7.0.6|~7.1.0",
66
"magento/module-store": "100.2.*",
77
"magento/module-catalog": "102.0.*",
8+
"magento/module-indexer": "100.2.*",
89
"magento/module-search": "100.2.*",
910
"magento/module-customer": "101.0.*",
1011
"magento/module-directory": "100.2.*",

app/code/Magento/CatalogSearch/etc/module.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<sequence>
1111
<module name="Magento_Search"/>
1212
<module name="Magento_Catalog"/>
13+
<module name="Magento_Indexer"/>
1314
</sequence>
1415
</module>
1516
</config>
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
namespace Magento\Indexer\Model;
7+
8+
/**
9+
* Provide functionality for executing user functions in multi-thread mode.
10+
*/
11+
class ProcessManager
12+
{
13+
/**
14+
* Threads count environment variable name
15+
*/
16+
const THREADS_COUNT = 'MAGE_INDEXER_THREADS_COUNT';
17+
18+
/** @var bool */
19+
private $failInChildProcess = false;
20+
21+
/** @var \Magento\Framework\App\ResourceConnection */
22+
private $resource;
23+
24+
/** @var int|null */
25+
private $threadsCount;
26+
27+
/**
28+
* @param \Magento\Framework\App\ResourceConnection $resource
29+
* @param int|null $threadsCount
30+
*/
31+
public function __construct(
32+
\Magento\Framework\App\ResourceConnection $resource = null,
33+
int $threadsCount = null
34+
) {
35+
if (null === $resource) {
36+
$resource = \Magento\Framework\App\ObjectManager::getInstance()->get(
37+
\Magento\Framework\App\ResourceConnection::class
38+
);
39+
}
40+
$this->resource = $resource;
41+
$this->threadsCount = (int)$threadsCount;
42+
}
43+
44+
/**
45+
* Execute user functions
46+
*
47+
* @param \Traversable $userFunctions
48+
*/
49+
public function execute($userFunctions)
50+
{
51+
if ($this->threadsCount > 1 && $this->isCanBeParalleled()) {
52+
$this->multiThreadsExecute($userFunctions);
53+
} else {
54+
$this->simpleThreadExecute($userFunctions);
55+
}
56+
}
57+
58+
/**
59+
* Execute user functions in in singleThreads mode
60+
*
61+
* @param \Traversable $userFunctions
62+
*/
63+
private function simpleThreadExecute($userFunctions)
64+
{
65+
foreach ($userFunctions as $userFunction) {
66+
call_user_func($userFunction);
67+
}
68+
}
69+
70+
/**
71+
* Execute user functions in in multiThreads mode
72+
*
73+
* @param \Traversable $userFunctions
74+
* @SuppressWarnings(PHPMD.UnusedLocalVariable)
75+
*/
76+
private function multiThreadsExecute($userFunctions)
77+
{
78+
$this->resource->closeConnection(null);
79+
$threadNumber = 0;
80+
foreach ($userFunctions as $userFunction) {
81+
$pid = pcntl_fork();
82+
if ($pid == -1) {
83+
throw new \RuntimeException('Unable to fork a new process');
84+
} elseif ($pid) {
85+
$this->executeParentProcess($threadNumber);
86+
} else {
87+
$this->startChildProcess($userFunction);
88+
}
89+
}
90+
while (pcntl_waitpid(0, $status) != -1) {
91+
//Waiting for the completion of child processes
92+
}
93+
94+
if ($this->failInChildProcess) {
95+
throw new \RuntimeException('Fail in child process');
96+
}
97+
}
98+
99+
/**
100+
* Is process can be paralleled
101+
*
102+
* @return bool
103+
*/
104+
private function isCanBeParalleled()
105+
{
106+
return function_exists('pcntl_fork');
107+
}
108+
109+
/**
110+
* Start child process
111+
*
112+
* @param callable $userFunction
113+
* @SuppressWarnings(PHPMD.ExitExpression)
114+
*/
115+
private function startChildProcess($userFunction)
116+
{
117+
$status = call_user_func($userFunction);
118+
$status = is_integer($status) ? $status : 0;
119+
exit($status);
120+
}
121+
122+
/**
123+
* Execute parent process
124+
*
125+
* @param int $threadNumber
126+
*/
127+
private function executeParentProcess(&$threadNumber)
128+
{
129+
$threadNumber++;
130+
if ($threadNumber >= $this->threadsCount) {
131+
pcntl_wait($status);
132+
if (pcntl_wexitstatus($status) !== 0) {
133+
$this->failInChildProcess = true;
134+
}
135+
$threadNumber--;
136+
}
137+
}
138+
}

app/code/Magento/Indexer/etc/di.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@
4242
<plugin name="page-cache-indexer-reindex-clean-cache"
4343
type="Magento\Indexer\Model\Processor\CleanCache" sortOrder="10"/>
4444
</type>
45-
45+
<type name="\Magento\Indexer\Model\ProcessManager">
46+
<arguments>
47+
<argument name="threadsCount" xsi:type="init_parameter">Magento\Indexer\Model\ProcessManager::THREADS_COUNT</argument>
48+
</arguments>
49+
</type>
4650
<type name="Magento\Framework\Console\CommandListInterface">
4751
<arguments>
4852
<argument name="commands" xsi:type="array">

dev/tests/integration/testsuite/Magento/Search/Block/TermTest.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ function ($object) {
4444
$result
4545
);
4646

47+
foreach ($actual as &$value) {
48+
unset($value['query_id']);
49+
}
50+
51+
foreach ($expected as &$value) {
52+
unset($value['query_id']);
53+
}
54+
4755
self::assertEquals(
4856
$expected,
4957
$actual
@@ -62,7 +70,6 @@ public function getTermsDataProvider()
6270
[
6371
'1st query' =>
6472
[
65-
'query_id' => '1',
6673
'query_text' => '1st query',
6774
'num_results' => '1',
6875
'popularity' => '5',
@@ -76,7 +83,6 @@ public function getTermsDataProvider()
7683
],
7784
'2nd query' =>
7885
[
79-
'query_id' => '2',
8086
'query_text' => '2nd query',
8187
'num_results' => '1',
8288
'popularity' => '10',
@@ -90,7 +96,6 @@ public function getTermsDataProvider()
9096
],
9197
'3rd query' =>
9298
[
93-
'query_id' => '3',
9499
'query_text' => '3rd query',
95100
'num_results' => '1',
96101
'popularity' => '1',

lib/internal/Magento/Framework/App/ResourceConnection.php

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Magento\Framework\App\ResourceConnection\ConfigInterface as ResourceConfigInterface;
1010
use Magento\Framework\Model\ResourceModel\Type\Db\ConnectionFactoryInterface;
1111
use Magento\Framework\Config\ConfigOptionsListConstants;
12+
use Magento\Framework\DB\Adapter\AdapterInterface;
1213

1314
/**
1415
* Application provides ability to configure multiple connections to persistent storage.
@@ -102,9 +103,21 @@ public function getConnection($resourceName = self::DEFAULT_CONNECTION)
102103
*/
103104
public function closeConnection($resourceName = self::DEFAULT_CONNECTION)
104105
{
105-
$processConnectionName = $this->getProcessConnectionName($this->config->getConnectionName($resourceName));
106-
if (isset($this->connections[$processConnectionName])) {
107-
$this->connections[$processConnectionName] = null;
106+
if ($resourceName === null) {
107+
foreach ($this->connections as $processConnection) {
108+
if ($processConnection !== null) {
109+
$processConnection->closeConnection();
110+
}
111+
}
112+
$this->connections = [];
113+
} else {
114+
$processConnectionName = $this->getProcessConnectionName($this->config->getConnectionName($resourceName));
115+
if (isset($this->connections[$processConnectionName])) {
116+
if ($this->connections[$processConnectionName] !== null) {
117+
$this->connections[$processConnectionName]->closeConnection();
118+
}
119+
$this->connections[$processConnectionName] = null;
120+
}
108121
}
109122
}
110123

0 commit comments

Comments
 (0)