@@ -81,13 +81,15 @@ public function walk(
81
81
throw new ChangelogTableNotExistsException (new Phrase ("Table %1 does not exist " , [$ changelogTableName ]));
82
82
}
83
83
84
+ $ processID = getmypid ();
85
+
84
86
$ idsTable = $ this ->idsTableBuilder ->build ($ changelog );
87
+ $ idsColumns = $ this ->getIdsColumns ($ idsTable );
85
88
86
89
try {
90
+ # Prepare list of changed entries to return
87
91
$ connection ->createTable ($ idsTable );
88
92
89
- $ columns = $ this ->getIdsColumns ($ idsTable );
90
-
91
93
$ select = $ this ->idsSelectBuilder ->build ($ changelog );
92
94
$ select
93
95
->distinct (true )
@@ -98,11 +100,12 @@ public function walk(
98
100
$ connection ->insertFromSelect (
99
101
$ select ,
100
102
$ idsTable ->getName (),
101
- $ columns ,
103
+ $ idsColumns ,
102
104
AdapterInterface::INSERT_IGNORE
103
105
)
104
106
);
105
107
108
+ # Provide list of changed entries
106
109
$ select = $ connection ->select ()
107
110
->from ($ idsTable ->getName ());
108
111
@@ -115,7 +118,7 @@ public function walk(
115
118
foreach ($ queries as $ query ) {
116
119
$ idsQuery = (clone $ query )
117
120
->reset (Select::COLUMNS )
118
- ->columns ($ columns );
121
+ ->columns ($ idsColumns );
119
122
120
123
$ ids = $ this ->idsFetcher ->fetch ($ idsQuery );
121
124
@@ -124,32 +127,51 @@ public function walk(
124
127
}
125
128
126
129
yield $ ids ;
130
+
131
+ if ($ this ->isChildProcess ($ processID )) {
132
+ return ;
133
+ }
127
134
}
128
135
} finally {
129
- $ connection ->dropTable ($ idsTable ->getName ());
136
+ # Cleanup list of changed entries
137
+ if (!$ this ->isChildProcess ($ processID )) {
138
+ $ connection ->dropTable ($ idsTable ->getName ());
139
+ }
130
140
}
131
141
}
132
142
133
143
/**
134
144
* Collect columns used as ID of changed entries
135
145
*
136
- * @param \Magento\Framework\DB\Ddl\Table $table
146
+ * @param \Magento\Framework\DB\Ddl\Table $idsTable
137
147
* @return array
138
148
*/
139
- private function getIdsColumns (Table $ table ): array
149
+ private function getIdsColumns (Table $ idsTable ): array
140
150
{
141
151
return array_values (
142
152
array_map (
143
153
static function (array $ column ) {
144
154
return $ column ['COLUMN_NAME ' ];
145
155
},
146
156
array_filter (
147
- $ table ->getColumns (),
157
+ $ idsTable ->getColumns (),
148
158
static function (array $ column ) {
149
159
return $ column ['PRIMARY ' ] === false ;
150
160
}
151
161
)
152
162
)
153
163
);
154
164
}
165
+
166
+ /**
167
+ * Check if the process was forked
168
+ *
169
+ * @param int $processID
170
+ * @return bool
171
+ */
172
+ private function isChildProcess (
173
+ int $ processID
174
+ ): bool {
175
+ return $ processID !== getmypid ();
176
+ }
155
177
}
0 commit comments