@@ -81,75 +81,164 @@ public function walk(
81
81
throw new ChangelogTableNotExistsException (new Phrase ("Table %1 does not exist " , [$ changelogTableName ]));
82
82
}
83
83
84
+ $ processID = getmypid ();
85
+
84
86
$ idsTable = $ this ->idsTableBuilder ->build ($ changelog );
87
+ $ idsColumns = $ this ->getIdsColumns ($ idsTable );
85
88
86
89
try {
87
- $ connection ->createTable ($ idsTable );
88
-
89
- $ columns = $ this ->getIdsColumns ($ idsTable );
90
-
91
- $ select = $ this ->idsSelectBuilder ->build ($ changelog );
92
- $ select
93
- ->distinct (true )
94
- ->where ('version_id > ? ' , $ fromVersionId )
95
- ->where ('version_id <= ? ' , $ lastVersionId );
96
-
97
- $ connection ->query (
98
- $ connection ->insertFromSelect (
99
- $ select ,
100
- $ idsTable ->getName (),
101
- $ columns ,
102
- AdapterInterface::INSERT_IGNORE
103
- )
90
+ $ this ->createList (
91
+ $ idsTable ,
92
+ $ idsColumns ,
93
+ $ changelog ,
94
+ $ fromVersionId ,
95
+ $ lastVersionId
104
96
);
105
97
106
- $ select = $ connection ->select ()
107
- ->from ($ idsTable ->getName ());
108
-
109
- $ queries = $ this ->generator ->generate (
110
- IdsTableBuilderInterface::FIELD_ID ,
111
- $ select ,
112
- $ batchSize
98
+ yield from $ this ->iterateList (
99
+ $ idsTable ,
100
+ $ idsColumns ,
101
+ $ batchSize ,
102
+ $ processID
113
103
);
114
-
115
- foreach ($ queries as $ query ) {
116
- $ idsQuery = (clone $ query )
117
- ->reset (Select::COLUMNS )
118
- ->columns ($ columns );
119
-
120
- $ ids = $ this ->idsFetcher ->fetch ($ idsQuery );
121
-
122
- if (empty ($ ids )) {
123
- continue ;
124
- }
125
-
126
- yield $ ids ;
127
- }
128
104
} finally {
129
- $ connection ->dropTable ($ idsTable ->getName ());
105
+ $ this ->removeList (
106
+ $ idsTable ,
107
+ $ processID
108
+ );
130
109
}
131
110
}
132
111
133
112
/**
134
113
* Collect columns used as ID of changed entries
135
114
*
136
- * @param \Magento\Framework\DB\Ddl\Table $table
115
+ * @param \Magento\Framework\DB\Ddl\Table $idsTable
137
116
* @return array
138
117
*/
139
- private function getIdsColumns (Table $ table ): array
118
+ private function getIdsColumns (Table $ idsTable ): array
140
119
{
141
120
return array_values (
142
121
array_map (
143
122
static function (array $ column ) {
144
123
return $ column ['COLUMN_NAME ' ];
145
124
},
146
125
array_filter (
147
- $ table ->getColumns (),
126
+ $ idsTable ->getColumns (),
148
127
static function (array $ column ) {
149
128
return $ column ['PRIMARY ' ] === false ;
150
129
}
151
130
)
152
131
)
153
132
);
154
133
}
134
+
135
+ /**
136
+ * Prepare list of changed entries to return
137
+ *
138
+ * @param \Magento\Framework\DB\Ddl\Table $idsTable
139
+ * @param array $idsColumns
140
+ * @param \Magento\Framework\Mview\View\ChangelogInterface $changelog
141
+ * @param int $fromVersionId
142
+ * @param int $lastVersionId
143
+ * @return void
144
+ * @throws \Zend_Db_Exception
145
+ */
146
+ private function createList (
147
+ Table $ idsTable ,
148
+ array $ idsColumns ,
149
+ ChangelogInterface $ changelog ,
150
+ int $ fromVersionId ,
151
+ int $ lastVersionId
152
+ ): void
153
+ {
154
+ $ connection = $ this ->resourceConnection ->getConnection ();
155
+ $ connection ->createTable ($ idsTable );
156
+
157
+ $ select = $ this ->idsSelectBuilder ->build ($ changelog );
158
+ $ select
159
+ ->distinct (true )
160
+ ->where ('version_id > ? ' , $ fromVersionId )
161
+ ->where ('version_id <= ? ' , $ lastVersionId );
162
+
163
+ $ connection ->query (
164
+ $ connection ->insertFromSelect (
165
+ $ select ,
166
+ $ idsTable ->getName (),
167
+ $ idsColumns ,
168
+ AdapterInterface::INSERT_IGNORE
169
+ )
170
+ );
171
+ }
172
+
173
+ /**
174
+ * Provide list of changed entries
175
+ *
176
+ * @param \Magento\Framework\DB\Ddl\Table $idsTable
177
+ * @param array $idsColumns
178
+ * @param int $batchSize
179
+ * @param int $processID
180
+ * @return iterable
181
+ * @throws \Magento\Framework\Exception\LocalizedException
182
+ * @throws \Zend_Db_Exception
183
+ */
184
+ private function iterateList (Table $ idsTable , array $ idsColumns , int $ batchSize , int $ processID ): iterable
185
+ {
186
+ $ connection = $ this ->resourceConnection ->getConnection ();
187
+
188
+ $ select = $ connection ->select ()
189
+ ->from ($ idsTable ->getName ());
190
+
191
+ $ queries = $ this ->generator ->generate (
192
+ IdsTableBuilderInterface::FIELD_ID ,
193
+ $ select ,
194
+ $ batchSize
195
+ );
196
+
197
+ foreach ($ queries as $ query ) {
198
+ $ idsQuery = (clone $ query )
199
+ ->reset (Select::COLUMNS )
200
+ ->columns ($ idsColumns );
201
+
202
+ $ ids = $ this ->idsFetcher ->fetch ($ idsQuery );
203
+
204
+ if (empty ($ ids )) {
205
+ continue ;
206
+ }
207
+
208
+ yield $ ids ;
209
+
210
+ if ($ this ->isChildProcess ($ processID )) {
211
+ return ;
212
+ }
213
+ }
214
+ }
215
+
216
+ /**
217
+ * Cleanup list of changed entries
218
+ *
219
+ * @param \Magento\Framework\DB\Ddl\Table $idsTable
220
+ * @param int $processID
221
+ * @return void
222
+ * @throws \Zend_Db_Exception
223
+ */
224
+ private function removeList (Table $ idsTable , int $ processID ): void
225
+ {
226
+ if ($ this ->isChildProcess ($ processID )) {
227
+ return ;
228
+ }
229
+
230
+ $ connection = $ this ->resourceConnection ->getConnection ();
231
+ $ connection ->dropTable ($ idsTable ->getName ());
232
+ }
233
+
234
+ /**
235
+ * Check if the process was forked
236
+ *
237
+ * @param int $processID
238
+ * @return bool
239
+ */
240
+ private function isChildProcess (int $ processID ): bool
241
+ {
242
+ return $ processID !== getmypid ();
243
+ }
155
244
}
0 commit comments