Skip to content

Commit 79a568f

Browse files
committed
MAGETWO-93669: [Forwardport] Implement parallelization for Category Product Indexer
1 parent 2b76b97 commit 79a568f

File tree

3 files changed

+211
-2
lines changed

3 files changed

+211
-2
lines changed

app/code/Magento/Catalog/Model/Indexer/Category/Product/Action/Full.php

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Magento\Catalog\Model\ResourceModel\Indexer\ActiveTableSwitcher;
99
use Magento\Framework\DB\Query\Generator as QueryGenerator;
1010
use Magento\Framework\App\ResourceConnection;
11+
use Magento\Indexer\Model\ProcessManager;
1112

1213
/**
1314
* Class Full reindex action
@@ -44,6 +45,11 @@ class Full extends \Magento\Catalog\Model\Indexer\Category\Product\AbstractActio
4445
*/
4546
private $activeTableSwitcher;
4647

48+
/**
49+
* @var ProcessManager
50+
*/
51+
private $processManager;
52+
4753
/**
4854
* @param ResourceConnection $resource
4955
* @param \Magento\Store\Model\StoreManagerInterface $storeManager
@@ -52,9 +58,10 @@ class Full extends \Magento\Catalog\Model\Indexer\Category\Product\AbstractActio
5258
* @param \Magento\Framework\Indexer\BatchSizeManagementInterface|null $batchSizeManagement
5359
* @param \Magento\Framework\Indexer\BatchProviderInterface|null $batchProvider
5460
* @param \Magento\Framework\EntityManager\MetadataPool|null $metadataPool
55-
* @param \Magento\Indexer\Model\Indexer\StateFactory|null $stateFactory
5661
* @param int|null $batchRowsCount
5762
* @param ActiveTableSwitcher|null $activeTableSwitcher
63+
* @param ProcessManager $processManager
64+
* @SuppressWarnings(PHPMD.ExcessiveParameterList)
5865
*/
5966
public function __construct(
6067
\Magento\Framework\App\ResourceConnection $resource,
@@ -65,7 +72,8 @@ public function __construct(
6572
\Magento\Framework\Indexer\BatchProviderInterface $batchProvider = null,
6673
\Magento\Framework\EntityManager\MetadataPool $metadataPool = null,
6774
$batchRowsCount = null,
68-
ActiveTableSwitcher $activeTableSwitcher = null
75+
ActiveTableSwitcher $activeTableSwitcher = null,
76+
ProcessManager $processManager = null
6977
) {
7078
parent::__construct(
7179
$resource,
@@ -85,6 +93,7 @@ public function __construct(
8593
);
8694
$this->batchRowsCount = $batchRowsCount;
8795
$this->activeTableSwitcher = $activeTableSwitcher ?: $objectManager->get(ActiveTableSwitcher::class);
96+
$this->processManager = $processManager ?: $objectManager->get(ProcessManager::class);
8897
}
8998

9099
/**
@@ -133,6 +142,38 @@ public function execute()
133142
return $this;
134143
}
135144

145+
/**
146+
* Run reindexation
147+
*
148+
* @return void
149+
*/
150+
protected function reindex()
151+
{
152+
$userFunctions = [];
153+
154+
foreach ($this->storeManager->getStores() as $store) {
155+
if ($this->getPathFromCategoryId($store->getRootCategoryId())) {
156+
$userFunctions[$store->getId()] = function () use ($store) {
157+
return $this->reindexStore($store);
158+
};
159+
}
160+
}
161+
162+
$this->processManager->execute($userFunctions);
163+
}
164+
165+
/**
166+
* Execute indexation by store
167+
*
168+
* @param \Magento\Store\Model\Store $store
169+
*/
170+
private function reindexStore($store)
171+
{
172+
$this->reindexRootCategory($store);
173+
$this->reindexAnchorCategories($store);
174+
$this->reindexNonAnchorCategories($store);
175+
}
176+
136177
/**
137178
* Publish data from tmp to replica table
138179
*
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
namespace Magento\Indexer\Model;
7+
8+
/**
9+
* Provide functionality for executing user functions in multi-thread mode.
10+
*/
11+
class ProcessManager
12+
{
13+
/**
14+
* Threads count environment variable name
15+
*/
16+
const THREADS_COUNT = 'MAGE_INDEXER_THREADS_COUNT';
17+
18+
/** @var bool */
19+
private $failInChildProcess = false;
20+
21+
/** @var \Magento\Framework\App\ResourceConnection */
22+
private $resource;
23+
24+
/** @var \Magento\Framework\Registry */
25+
private $registry;
26+
27+
/** @var int|null */
28+
private $threadsCount;
29+
30+
/**
31+
* @param \Magento\Framework\App\ResourceConnection $resource
32+
* @param \Magento\Framework\Registry $registry
33+
* @param int|null $threadsCount
34+
*/
35+
public function __construct(
36+
\Magento\Framework\App\ResourceConnection $resource,
37+
\Magento\Framework\Registry $registry = null,
38+
int $threadsCount = null
39+
) {
40+
$this->resource = $resource;
41+
if (null === $registry) {
42+
$registry = \Magento\Framework\App\ObjectManager::getInstance()->get(
43+
\Magento\Framework\Registry::class
44+
);
45+
}
46+
$this->registry = $registry;
47+
$this->threadsCount = (int)$threadsCount;
48+
}
49+
50+
/**
51+
* Execute user functions
52+
*
53+
* @param \Traversable $userFunctions
54+
*/
55+
public function execute($userFunctions)
56+
{
57+
if ($this->threadsCount > 1 && $this->isCanBeParalleled() && !$this->isSetupMode() && PHP_SAPI == 'cli') {
58+
$this->multiThreadsExecute($userFunctions);
59+
} else {
60+
$this->simpleThreadExecute($userFunctions);
61+
}
62+
}
63+
64+
/**
65+
* Execute user functions in singleThreads mode
66+
*
67+
* @param \Traversable $userFunctions
68+
*/
69+
private function simpleThreadExecute($userFunctions)
70+
{
71+
foreach ($userFunctions as $userFunction) {
72+
call_user_func($userFunction);
73+
}
74+
}
75+
76+
/**
77+
* Execute user functions in multiThreads mode
78+
*
79+
* @param \Traversable $userFunctions
80+
* @SuppressWarnings(PHPMD.UnusedLocalVariable)
81+
*/
82+
private function multiThreadsExecute($userFunctions)
83+
{
84+
$this->resource->closeConnection(null);
85+
$threadNumber = 0;
86+
foreach ($userFunctions as $userFunction) {
87+
$pid = pcntl_fork();
88+
if ($pid == -1) {
89+
throw new \RuntimeException('Unable to fork a new process');
90+
} elseif ($pid) {
91+
$this->executeParentProcess($threadNumber);
92+
} else {
93+
$this->startChildProcess($userFunction);
94+
}
95+
}
96+
while (pcntl_waitpid(0, $status) != -1) {
97+
//Waiting for the completion of child processes
98+
}
99+
100+
if ($this->failInChildProcess) {
101+
throw new \RuntimeException('Fail in child process');
102+
}
103+
}
104+
105+
/**
106+
* Is process can be paralleled
107+
*
108+
* @return bool
109+
*/
110+
private function isCanBeParalleled()
111+
{
112+
return function_exists('pcntl_fork');
113+
}
114+
115+
/**
116+
* Is setup mode
117+
*
118+
* @return bool
119+
*/
120+
private function isSetupMode()
121+
{
122+
return $this->registry->registry('setup-mode-enabled') ?: false;
123+
}
124+
125+
/**
126+
* Start child process
127+
*
128+
* @param callable $userFunction
129+
* @SuppressWarnings(PHPMD.ExitExpression)
130+
*/
131+
private function startChildProcess($userFunction)
132+
{
133+
$status = call_user_func($userFunction);
134+
$status = is_integer($status) ? $status : 0;
135+
exit($status);
136+
}
137+
138+
/**
139+
* Execute parent process
140+
*
141+
* @param int $threadNumber
142+
*/
143+
private function executeParentProcess(&$threadNumber)
144+
{
145+
$threadNumber++;
146+
if ($threadNumber >= $this->threadsCount) {
147+
pcntl_wait($status);
148+
if (pcntl_wexitstatus($status) !== 0) {
149+
$this->failInChildProcess = true;
150+
}
151+
$threadNumber--;
152+
}
153+
}
154+
}

setup/src/Magento/Setup/Model/Installer.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,11 @@ public function declarativeInstallSchema(array $request)
815815
*/
816816
public function installSchema(array $request)
817817
{
818+
/** @var \Magento\Framework\Registry $registry */
819+
$registry = $this->objectManagerProvider->get()->get(\Magento\Framework\Registry::class);
820+
//For backward compatibility in install and upgrade scripts with enabled parallelization.
821+
$registry->register('setup-mode-enabled', true);
822+
818823
$this->assertDbConfigExists();
819824
$this->assertDbAccessible();
820825
$setup = $this->setupFactory->create($this->context->getResources());
@@ -831,6 +836,8 @@ public function installSchema(array $request)
831836
$schemaListener->setResource('default');
832837
$this->schemaPersistor->persist($schemaListener);
833838
}
839+
840+
$registry->unregister('setup-mode-enabled');
834841
}
835842

836843
/**
@@ -853,12 +860,19 @@ private function convertationOfOldScriptsIsAllowed(array $request)
853860
*/
854861
public function installDataFixtures(array $request = [])
855862
{
863+
/** @var \Magento\Framework\Registry $registry */
864+
$registry = $this->objectManagerProvider->get()->get(\Magento\Framework\Registry::class);
865+
//For backward compatibility in install and upgrade scripts with enabled parallelization.
866+
$registry->register('setup-mode-enabled', true);
867+
856868
$this->assertDbConfigExists();
857869
$this->assertDbAccessible();
858870
$setup = $this->dataSetupFactory->create();
859871
$this->checkFilePermissionsForDbUpgrade();
860872
$this->log->log('Data install/update:');
861873
$this->handleDBSchemaData($setup, 'data', $request);
874+
875+
$registry->unregister('setup-mode-enabled');
862876
}
863877

864878
/**

0 commit comments

Comments
 (0)