@@ -205,14 +205,11 @@ CommitNumber TipCache::cacheState(TraNumber number)
205
205
if (number < oldest)
206
206
return CN_PREHISTORIC;
207
207
208
- // It is possible to amortize barrier in getTransactionStatusBlock
209
- // over large number of operations, if our callers are made aware of
210
- // TransactionStatusBlock granularity and iterate over transactions
211
- // directly. But since this function is not really called too frequently,
212
- // it should not matter and we leave interface "as is" for now.
213
208
TpcBlockNumber blockNumber = number / m_transactionsPerBlock;
214
209
ULONG offset = number % m_transactionsPerBlock;
215
- TransactionStatusBlock* block = getTransactionStatusBlock (header, blockNumber);
210
+
211
+ Sync sync (&m_sync_status, FB_FUNCTION);
212
+ TransactionStatusBlock* block = getTransactionStatusBlock (header, blockNumber, sync);
216
213
217
214
// This should not really happen ever
218
215
fb_assert (block);
@@ -334,7 +331,9 @@ void TipCache::loadInventoryPages(thread_db* tdbb, GlobalTpcHeader* header)
334
331
335
332
TpcBlockNumber blockNumber = hdr_oldest_transaction / m_transactionsPerBlock;
336
333
ULONG transOffset = hdr_oldest_transaction % m_transactionsPerBlock;
337
- TransactionStatusBlock* statusBlock = getTransactionStatusBlock (header, blockNumber);
334
+
335
+ SyncLockGuard sync (&m_sync_status, SYNC_EXCLUSIVE, FB_FUNCTION);
336
+ TransactionStatusBlock* statusBlock = createTransactionStatusBlock (header->tpc_block_size , blockNumber);
338
337
339
338
for (TraNumber t = hdr_oldest_transaction; ; )
340
339
{
@@ -353,7 +352,7 @@ void TipCache::loadInventoryPages(thread_db* tdbb, GlobalTpcHeader* header)
353
352
{
354
353
blockNumber++;
355
354
transOffset = 0 ;
356
- statusBlock = getTransactionStatusBlock (header, blockNumber);
355
+ statusBlock = createTransactionStatusBlock (header-> tpc_block_size , blockNumber);
357
356
}
358
357
}
359
358
}
@@ -364,7 +363,10 @@ void TipCache::mapInventoryPages(GlobalTpcHeader* header)
364
363
const TpcBlockNumber lastNumber = header->latest_transaction_id / m_transactionsPerBlock;
365
364
366
365
for (; blockNumber <= lastNumber; blockNumber++)
367
- getTransactionStatusBlock (header, blockNumber);
366
+ {
367
+ Sync sync (&m_sync_status, FB_FUNCTION);
368
+ getTransactionStatusBlock (header, blockNumber, sync);
369
+ }
368
370
}
369
371
370
372
TipCache::StatusBlockData::StatusBlockData (thread_db* tdbb, TipCache* tipCache, ULONG blockSize, TpcBlockNumber blkNumber)
@@ -498,20 +500,24 @@ TipCache::TransactionStatusBlock* TipCache::createTransactionStatusBlock(ULONG b
498
500
return blockData->memory ->getHeader ();
499
501
}
500
502
501
- TipCache::TransactionStatusBlock* TipCache::getTransactionStatusBlock (GlobalTpcHeader* header, TpcBlockNumber blockNumber)
503
+ TipCache::TransactionStatusBlock* TipCache::getTransactionStatusBlock (GlobalTpcHeader* header, TpcBlockNumber blockNumber, Sync& sync )
502
504
{
505
+ fb_assert (sync.getState () == SYNC_NONE);
506
+
503
507
// This is a double-checked locking pattern. SyncLockGuard uses atomic ops internally and should be cheap
504
508
TransactionStatusBlock* block = NULL ;
505
509
{
506
- SyncLockGuard sync (&m_sync_status, SYNC_SHARED, " TipCache::getTransactionStatusBlock " );
510
+ sync. lock ( SYNC_SHARED);
507
511
BlocksMemoryMap::ConstAccessor acc (&m_blocks_memory);
508
512
if (acc.locate (blockNumber))
509
513
block = acc.current ()->memory ->getHeader ();
514
+ else
515
+ sync.unlock ();
510
516
}
511
517
512
518
if (!block)
513
519
{
514
- SyncLockGuard sync (&m_sync_status, SYNC_EXCLUSIVE, " TipCache::getTransactionStatusBlock " );
520
+ sync. lock ( SYNC_EXCLUSIVE);
515
521
BlocksMemoryMap::ConstAccessor acc (&m_blocks_memory);
516
522
if (acc.locate (blockNumber))
517
523
block = acc.current ()->memory ->getHeader ();
@@ -521,6 +527,8 @@ TipCache::TransactionStatusBlock* TipCache::getTransactionStatusBlock(GlobalTpcH
521
527
TraNumber oldest = header->oldest_transaction .load (std::memory_order_relaxed);
522
528
if (blockNumber >= oldest / m_transactionsPerBlock)
523
529
block = createTransactionStatusBlock (header->tpc_block_size , blockNumber);
530
+ else
531
+ sync.unlock ();
524
532
}
525
533
}
526
534
return block;
@@ -532,24 +540,36 @@ TraNumber TipCache::findStates(TraNumber minNumber, TraNumber maxNumber, ULONG m
532
540
fb_assert (m_tpcHeader);
533
541
GlobalTpcHeader* header = m_tpcHeader->getHeader ();
534
542
535
- TransactionStatusBlock* statusBlock;
536
- TpcBlockNumber blockNumber;
537
- ULONG transOffset;
543
+ TransactionStatusBlock* statusBlock = nullptr ;
544
+ ULONG transOffset = 0 ;
538
545
539
- do
546
+ Sync sync (&m_sync_status, FB_FUNCTION);
547
+ for (TraNumber tran = minNumber; ; tran++, transOffset++)
540
548
{
541
- TraNumber oldest = header->oldest_transaction .load (std::memory_order_relaxed);
549
+ if (transOffset == m_transactionsPerBlock)
550
+ {
551
+ sync.unlock ();
552
+ statusBlock = nullptr ;
553
+ }
554
+
555
+ while (!statusBlock)
556
+ {
557
+ const TraNumber oldest = header->oldest_transaction .load (std::memory_order_relaxed);
542
558
543
- if (minNumber < oldest)
544
- minNumber = oldest;
559
+ if (tran < oldest)
560
+ tran = oldest;
545
561
546
- blockNumber = minNumber / m_transactionsPerBlock;
547
- transOffset = minNumber % m_transactionsPerBlock;
548
- statusBlock = getTransactionStatusBlock (header, blockNumber);
549
- } while (!statusBlock);
562
+ const TpcBlockNumber blockNumber = tran / m_transactionsPerBlock;
563
+ transOffset = tran % m_transactionsPerBlock;
564
+ statusBlock = getTransactionStatusBlock (header, blockNumber, sync);
565
+
566
+ if (sync.getState () == SYNC_EXCLUSIVE)
567
+ sync.downgrade (SYNC_SHARED);
568
+ }
569
+
570
+ if (tran >= maxNumber)
571
+ break ;
550
572
551
- for (TraNumber t = minNumber; ; )
552
- {
553
573
// Barrier is not needed here. Slightly out-dated information shall be ok here.
554
574
// Such transaction shall already be considered active by our caller.
555
575
// TODO: check if this assumption is indeed correct.
@@ -577,18 +597,8 @@ TraNumber TipCache::findStates(TraNumber minNumber, TraNumber maxNumber, ULONG m
577
597
break ;
578
598
}
579
599
580
- if (((1 << state) & mask) != 0 )
581
- return t;
582
-
583
- if (++t >= maxNumber)
584
- break ;
585
-
586
- if (++transOffset == m_transactionsPerBlock)
587
- {
588
- blockNumber++;
589
- transOffset = 0 ;
590
- statusBlock = getTransactionStatusBlock (header, blockNumber);
591
- }
600
+ if (((1UL << state) & mask) != 0 )
601
+ return tran;
592
602
}
593
603
594
604
return 0 ;
@@ -600,13 +610,11 @@ CommitNumber TipCache::setState(TraNumber number, int state)
600
610
fb_assert (m_tpcHeader);
601
611
GlobalTpcHeader* header = m_tpcHeader->getHeader ();
602
612
603
- // over large number of operations, if our callers are made aware of
604
- // TransactionStatusBlock granularity and iterate over transactions
605
- // directly. But since this function is not really called too frequently,
606
- // it should not matter and we leave interface "as is" for now.
607
613
TpcBlockNumber blockNumber = number / m_transactionsPerBlock;
608
614
ULONG offset = number % m_transactionsPerBlock;
609
- TransactionStatusBlock* block = getTransactionStatusBlock (header, blockNumber);
615
+
616
+ Sync sync (&m_sync_status, FB_FUNCTION);
617
+ TransactionStatusBlock* block = getTransactionStatusBlock (header, blockNumber, sync);
610
618
611
619
// This should not really happen
612
620
if (!block)
@@ -799,7 +807,7 @@ void TipCache::releaseSharedMemory(thread_db* tdbb, TraNumber oldest_old, TraNum
799
807
if (blocksToCleanup.isEmpty ())
800
808
return ;
801
809
802
- SyncLockGuard sync (&m_sync_status, SYNC_EXCLUSIVE, " TipCache::releaseSharedMemory " );
810
+ SyncLockGuard sync (&m_sync_status, SYNC_EXCLUSIVE, FB_FUNCTION );
803
811
while (blocksToCleanup.hasData ())
804
812
{
805
813
TpcBlockNumber blockNumber = blocksToCleanup.pop ();
0 commit comments