29
29
30
30
#include " QueryResult.hpp"
31
31
32
+ #include " oatpp/orm/Transaction.hpp"
33
+
32
34
#include " oatpp/core/data/stream/ChunkedBuffer.hpp"
35
+ #include " oatpp/core/macro/codegen.hpp"
33
36
34
37
#include < vector>
35
38
36
39
namespace oatpp { namespace postgresql {
37
40
41
+ namespace {
42
+
43
+ #include OATPP_CODEGEN_BEGIN(DTO)
44
+
45
+ class VersionRow : public oatpp ::DTO {
46
+
47
+ DTO_INIT (VersionRow, DTO);
48
+
49
+ DTO_FIELD (Int64, version);
50
+
51
+ };
52
+
53
+ #include OATPP_CODEGEN_END(DTO)
54
+
55
+ }
56
+
38
57
Executor::QueryParams::QueryParams (const StringTemplate& queryTemplate,
39
58
const std::unordered_map<oatpp::String, oatpp::Void>& params,
40
59
const mapping::TypeMapper& typeMapper,
@@ -44,7 +63,11 @@ Executor::QueryParams::QueryParams(const StringTemplate& queryTemplate,
44
63
auto extra = std::static_pointer_cast<ql_template::Parser::TemplateExtra>(queryTemplate.getExtraData ());
45
64
46
65
query = extra->preparedTemplate ->c_str ();
47
- queryName = extra->templateName ->c_str ();
66
+ if (extra->templateName ) {
67
+ queryName = extra->templateName ->c_str ();
68
+ } else {
69
+ queryName = nullptr ;
70
+ }
48
71
49
72
count = queryTemplate.getTemplateVariables ().size ();
50
73
@@ -214,43 +237,188 @@ std::shared_ptr<orm::QueryResult> Executor::execute(const StringTemplate& queryT
214
237
215
238
}
216
239
217
- std::shared_ptr<orm::QueryResult> Executor::begin (const std::shared_ptr<orm::Connection>& connection) {
240
+ std::shared_ptr<orm::QueryResult> Executor::exec (const oatpp::String& statement,
241
+ const std::shared_ptr<orm::Connection>& connection,
242
+ bool useExecParams)
243
+ {
218
244
219
245
std::shared_ptr<orm::Connection> conn = connection;
220
246
if (!conn) {
221
247
conn = getConnection ();
222
248
}
223
249
224
250
auto pgConnection = std::static_pointer_cast<postgresql::Connection>(conn);
225
- PGresult *qres = PQexec (pgConnection->getHandle (), " BEGIN" );
251
+
252
+ PGresult *qres;
253
+ if (useExecParams) {
254
+ qres = PQexecParams (pgConnection->getHandle (),
255
+ statement->c_str (),
256
+ 0 /* nParams */ ,
257
+ nullptr /* paramTypes */ ,
258
+ nullptr /* paramValues */ ,
259
+ nullptr /* paramLengths */ ,
260
+ nullptr /* paramFormats */ ,
261
+ 1 /* resultFormat */ );
262
+ } else {
263
+ qres = PQexec (pgConnection->getHandle (), statement->c_str ());
264
+ }
265
+
226
266
return std::make_shared<QueryResult>(qres, pgConnection, m_connectionProvider, m_resultMapper);
227
267
228
268
}
229
269
270
+ std::shared_ptr<orm::QueryResult> Executor::begin (const std::shared_ptr<orm::Connection>& connection) {
271
+ return exec (" BEGIN" , connection);
272
+ }
273
+
230
274
std::shared_ptr<orm::QueryResult> Executor::commit (const std::shared_ptr<orm::Connection>& connection) {
231
- auto pgConnection = std::static_pointer_cast<postgresql::Connection>(connection);
232
- PGresult *qres = PQexec (pgConnection->getHandle (), " END" );
233
- return std::make_shared<QueryResult>(qres, pgConnection, m_connectionProvider, m_resultMapper);
275
+ if (!connection) {
276
+ throw std::runtime_error (" [oatpp::postgresql::Executor::commit()]: "
277
+ " Error. Can't COMMIT - NULL connection." );
278
+ }
279
+ return exec (" COMMIT" , connection);
234
280
}
235
281
236
282
std::shared_ptr<orm::QueryResult> Executor::rollback (const std::shared_ptr<orm::Connection>& connection) {
237
- auto pgConnection = std::static_pointer_cast<postgresql::Connection>(connection);
238
- PGresult *qres = PQexec (pgConnection->getHandle (), " ROLLBACK" );
239
- return std::make_shared<QueryResult>(qres, pgConnection, m_connectionProvider, m_resultMapper);
283
+ if (!connection) {
284
+ throw std::runtime_error (" [oatpp::postgresql::Executor::commit()]: "
285
+ " Error. Can't ROLLBACK - NULL connection." );
286
+ }
287
+ return exec (" ROLLBACK" , connection);
288
+ }
289
+
290
+ oatpp::String Executor::getSchemaVersionTableName (const oatpp::String& suffix) {
291
+ data::stream::BufferOutputStream stream;
292
+ stream << " oatpp_schema_version" ;
293
+ if (suffix && suffix->getSize () > 0 ) {
294
+ stream << " _" << suffix;
295
+ }
296
+ return stream.toString ();
297
+ }
298
+
299
+ std::shared_ptr<orm::QueryResult> Executor::updateSchemaVersion (v_int64 newVersion,
300
+ const oatpp::String& suffix,
301
+ const std::shared_ptr<orm::Connection>& connection)
302
+ {
303
+ data::stream::BufferOutputStream stream;
304
+ stream
305
+ << " UPDATE "
306
+ << getSchemaVersionTableName (suffix) << " "
307
+ << " SET version=" << newVersion << " ;" ;
308
+ return exec (stream.toString (), connection, true );
240
309
}
241
310
242
311
v_int64 Executor::getSchemaVersion (const oatpp::String& suffix,
243
312
const std::shared_ptr<orm::Connection>& connection)
244
313
{
245
- // TODO implement me!
314
+
315
+ std::shared_ptr<orm::QueryResult> result;
316
+
317
+ {
318
+ data::stream::BufferOutputStream stream;
319
+ stream << " CREATE TABLE IF NOT EXISTS " << getSchemaVersionTableName (suffix) << " (version BIGINT)" ;
320
+ result = exec (stream.toString (), connection);
321
+ if (!result->isSuccess ()) {
322
+ throw std::runtime_error (" [oatpp::postgresql::Executor::getSchemaVersion()]: "
323
+ " Error. Can't create schema version table. " + result->getErrorMessage ()->std_str ());
324
+ }
325
+ }
326
+
327
+ data::stream::BufferOutputStream stream;
328
+ stream << " SELECT * FROM " << getSchemaVersionTableName (suffix);
329
+ result = exec (stream.toString (), result->getConnection (), true );
330
+ if (!result->isSuccess ()) {
331
+ throw std::runtime_error (" [oatpp::postgresql::Executor::getSchemaVersion()]: "
332
+ " Error. Can't get schema version. " + result->getErrorMessage ()->std_str ());
333
+ }
334
+
335
+ auto rows = result->fetch <oatpp::Vector<oatpp::Object<VersionRow>>>();
336
+
337
+ if (rows->size () == 0 ) {
338
+
339
+ stream.setCurrentPosition (0 );
340
+ stream << " INSERT INTO " << getSchemaVersionTableName (suffix) << " (version) VALUES (0)" ;
341
+ result = exec (stream.toString (), result->getConnection (), true );
342
+
343
+ if (result->isSuccess ()) {
344
+ return 0 ;
345
+ }
346
+
347
+ throw std::runtime_error (" [oatpp::postgresql::Executor::getSchemaVersion()]: "
348
+ " Error. Can't init schema version. " + result->getErrorMessage ()->std_str ());
349
+
350
+ } else if (rows->size () == 1 ) {
351
+
352
+ auto row = rows[0 ];
353
+ if (!row->version ) {
354
+ throw std::runtime_error (" [oatpp::postgresql::Executor::getSchemaVersion()]: "
355
+ " Error. The schema version table is corrupted - version is null." );
356
+ }
357
+
358
+ return row->version ;
359
+
360
+ }
361
+
362
+ throw std::runtime_error (" [oatpp::postgresql::Executor::getSchemaVersion()]: "
363
+ " Error. The schema version table is corrupted - multiple version rows." );
364
+
246
365
}
247
366
248
367
void Executor::migrateSchema (const oatpp::String& script,
249
368
v_int64 newVersion,
250
369
const oatpp::String& suffix,
251
370
const std::shared_ptr<orm::Connection>& connection)
252
371
{
253
- // TODO implement me!
372
+
373
+ if (!script) {
374
+ throw std::runtime_error (" [oatpp::postgresql::Executor::migrateSchema()]: Error. Script is null." );
375
+ }
376
+
377
+ if (!connection) {
378
+ throw std::runtime_error (" [oatpp::postgresql::Executor::migrateSchema()]: Error. Connection is null." );
379
+ }
380
+
381
+ auto currVersion = getSchemaVersion (suffix, connection);
382
+ if (newVersion <= currVersion) {
383
+ return ;
384
+ }
385
+
386
+ if (newVersion > currVersion + 1 ) {
387
+ throw std::runtime_error (" [oatpp::postgresql::Executor::migrateSchema()]: Error. +1 version increment is allowed only." );
388
+ }
389
+
390
+ if (script->getSize () == 0 ) {
391
+ OATPP_LOGW (" [oatpp::postgresql::Executor::migrateSchema()]" , " Warning. Executing empty script for version %d" , newVersion);
392
+ }
393
+
394
+ {
395
+
396
+ orm::Transaction transaction (this , connection);
397
+
398
+ std::shared_ptr<orm::QueryResult> result;
399
+
400
+ result = exec (script, connection);
401
+ if (!result->isSuccess ()) {
402
+ OATPP_LOGE (" [oatpp::postgresql::Executor::migrateSchema()]" ,
403
+ " Error. Migration failed for version %d. %s" , newVersion, result->getErrorMessage ()->c_str ());
404
+ throw std::runtime_error (" [oatpp::postgresql::Executor::migrateSchema()]: "
405
+ " Error. Migration failed. " + result->getErrorMessage ()->std_str ());
406
+
407
+ }
408
+
409
+ result = updateSchemaVersion (newVersion, suffix, connection);
410
+
411
+ if (!result->isSuccess () || result->hasMoreToFetch () > 0 ) {
412
+ throw std::runtime_error (" [oatpp::postgresql::Executor::migrateSchema()]: Error. Migration failed. Can't set new version." );
413
+ }
414
+
415
+ result = transaction.commit ();
416
+ if (!result->isSuccess ()) {
417
+ throw std::runtime_error (" [oatpp::postgresql::Executor::migrateSchema()]: Error. Migration failed. Can't commit." );
418
+ }
419
+
420
+ }
421
+
254
422
}
255
423
256
424
}}
0 commit comments