Skip to content

Commit 8417383

Browse files
committed
store: Several improvements to Writable
- check that the writer thread is still running when accepting items to be queued. It makes no sense to accept requests when the writer thread is not running, and should therefore fail early. - change the mechanism for how tests can single-step a deployment to be specific to that deployment to avoid problems with tests running at the same time. This was a huge footgun, and getting caught in that looks like a deadlock - implement Debug for queued requests
1 parent 04ba48f commit 8417383

File tree

2 files changed

+109
-28
lines changed

2 files changed

+109
-28
lines changed

store/postgres/src/writable.rs

Lines changed: 107 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ use std::{collections::BTreeMap, sync::Arc};
55

66
use graph::blockchain::block_stream::FirehoseCursor;
77
use graph::components::store::{DeploymentCursorTracker, DerivedEntityQuery, EntityKey, ReadStore};
8+
use graph::constraint_violation;
89
use graph::data::subgraph::schema;
910
use graph::data_source::CausalityRegion;
1011
use graph::prelude::{
1112
BlockNumber, Entity, MetricsRegistry, Schema, SubgraphDeploymentEntity, SubgraphStore as _,
1213
BLOCK_NUMBER_MAX,
1314
};
1415
use graph::slog::info;
16+
use graph::tokio::task::JoinHandle;
1517
use graph::util::bounded_queue::BoundedQueue;
1618
use graph::{
1719
cheap_clone::CheapClone,
@@ -448,6 +450,29 @@ enum Request {
448450
Stop,
449451
}
450452

453+
impl std::fmt::Debug for Request {
454+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455+
match self {
456+
Self::Write {
457+
block_ptr,
458+
mods,
459+
store,
460+
..
461+
} => write!(
462+
f,
463+
"write[{}, {:p}, {} entities]",
464+
block_ptr.number,
465+
store.as_ref(),
466+
mods.len()
467+
),
468+
Self::RevertTo {
469+
block_ptr, store, ..
470+
} => write!(f, "revert[{}, {:p}]", block_ptr.number, store.as_ref()),
471+
Self::Stop => write!(f, "stop"),
472+
}
473+
}
474+
}
475+
451476
enum ExecResult {
452477
Continue,
453478
Stop,
@@ -520,28 +545,56 @@ struct Queue {
520545
/// allowed to process as many requests as it can
521546
#[cfg(debug_assertions)]
522547
pub(crate) mod test_support {
523-
use std::sync::atomic::{AtomicBool, Ordering};
548+
use std::{
549+
collections::HashMap,
550+
sync::{Arc, Mutex},
551+
};
524552

525-
use graph::{prelude::lazy_static, util::bounded_queue::BoundedQueue};
553+
use graph::{
554+
components::store::{DeploymentId, DeploymentLocator},
555+
prelude::lazy_static,
556+
util::bounded_queue::BoundedQueue,
557+
};
526558

527559
lazy_static! {
528-
static ref DO_STEP: AtomicBool = AtomicBool::new(false);
529-
static ref ALLOWED_STEPS: BoundedQueue<()> = BoundedQueue::with_capacity(1_000);
560+
static ref STEPS: Mutex<HashMap<DeploymentId, Arc<BoundedQueue<()>>>> =
561+
Mutex::new(HashMap::new());
530562
}
531563

532-
pub(super) async fn take_step() {
533-
if DO_STEP.load(Ordering::SeqCst) {
534-
ALLOWED_STEPS.pop().await
564+
pub(super) async fn take_step(deployment: &DeploymentLocator) {
565+
let steps = STEPS.lock().unwrap().get(&deployment.id).cloned();
566+
if let Some(steps) = steps {
567+
steps.pop().await;
535568
}
536569
}
537570

538571
/// Allow the writer to process `steps` requests. After calling this,
539572
/// the writer will only process the number of requests it is allowed to
540-
pub async fn allow_steps(steps: usize) {
573+
pub async fn allow_steps(deployment: &DeploymentLocator, steps: usize) {
574+
let queue = {
575+
let mut map = STEPS.lock().unwrap();
576+
map.entry(deployment.id)
577+
.or_insert_with(|| Arc::new(BoundedQueue::with_capacity(1_000)))
578+
.clone()
579+
};
541580
for _ in 0..steps {
542-
ALLOWED_STEPS.push(()).await
581+
queue.push(()).await
543582
}
544-
DO_STEP.store(true, Ordering::SeqCst);
583+
}
584+
}
585+
586+
impl std::fmt::Debug for Queue {
587+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588+
let reqs = self.queue.fold(vec![], |mut reqs, req| {
589+
reqs.push(req.clone());
590+
reqs
591+
});
592+
593+
write!(f, "reqs[{} : ", self.store.site)?;
594+
for req in reqs {
595+
write!(f, " {:?}", req)?;
596+
}
597+
writeln!(f, "]")
545598
}
546599
}
547600

@@ -552,11 +605,11 @@ impl Queue {
552605
store: Arc<SyncStore>,
553606
capacity: usize,
554607
registry: Arc<MetricsRegistry>,
555-
) -> Arc<Self> {
608+
) -> (Arc<Self>, JoinHandle<()>) {
556609
async fn start_writer(queue: Arc<Queue>, logger: Logger) {
557610
loop {
558611
#[cfg(debug_assertions)]
559-
test_support::take_step().await;
612+
test_support::take_step(&queue.store.site.as_ref().into()).await;
560613

561614
// We peek at the front of the queue, rather than pop it
562615
// right away, so that query methods like `get` have access
@@ -623,9 +676,9 @@ impl Queue {
623676
};
624677
let queue = Arc::new(queue);
625678

626-
graph::spawn(start_writer(queue.cheap_clone(), logger));
679+
let handle = graph::spawn(start_writer(queue.cheap_clone(), logger));
627680

628-
queue
681+
(queue, handle)
629682
}
630683

631684
/// Add a write request to the queue
@@ -637,6 +690,7 @@ impl Queue {
637690

638691
/// Wait for the background writer to finish processing queued entries
639692
async fn flush(&self) -> Result<(), StoreError> {
693+
self.check_err()?;
640694
self.queue.wait_empty().await;
641695
self.check_err()
642696
}
@@ -880,7 +934,10 @@ impl Queue {
880934
/// A shim to allow bypassing any pipelined store handling if need be
881935
enum Writer {
882936
Sync(Arc<SyncStore>),
883-
Async(Arc<Queue>),
937+
Async {
938+
queue: Arc<Queue>,
939+
join_handle: JoinHandle<()>,
940+
},
884941
}
885942

886943
impl Writer {
@@ -894,7 +951,24 @@ impl Writer {
894951
if capacity == 0 {
895952
Self::Sync(store)
896953
} else {
897-
Self::Async(Queue::start(logger, store, capacity, registry))
954+
let (queue, join_handle) = Queue::start(logger, store.clone(), capacity, registry);
955+
Self::Async { queue, join_handle }
956+
}
957+
}
958+
959+
fn check_queue_running(&self) -> Result<(), StoreError> {
960+
match self {
961+
Writer::Sync(_) => Ok(()),
962+
Writer::Async { join_handle, queue } => {
963+
if join_handle.is_finished() {
964+
Err(constraint_violation!(
965+
"Subgraph writer for {} is not running",
966+
queue.store.site
967+
))
968+
} else {
969+
Ok(())
970+
}
971+
}
898972
}
899973
}
900974

@@ -920,7 +994,8 @@ impl Writer {
920994
&manifest_idx_and_name,
921995
&processed_data_sources,
922996
),
923-
Writer::Async(queue) => {
997+
Writer::Async { queue, .. } => {
998+
self.check_queue_running()?;
924999
let req = Request::Write {
9251000
store: queue.store.cheap_clone(),
9261001
stopwatch: queue.stopwatch.cheap_clone(),
@@ -944,7 +1019,8 @@ impl Writer {
9441019
) -> Result<(), StoreError> {
9451020
match self {
9461021
Writer::Sync(store) => store.revert_block_operations(block_ptr_to, &firehose_cursor),
947-
Writer::Async(queue) => {
1022+
Writer::Async { queue, .. } => {
1023+
self.check_queue_running()?;
9481024
let req = Request::RevertTo {
9491025
store: queue.store.cheap_clone(),
9501026
block_ptr: block_ptr_to,
@@ -958,14 +1034,17 @@ impl Writer {
9581034
async fn flush(&self) -> Result<(), StoreError> {
9591035
match self {
9601036
Writer::Sync { .. } => Ok(()),
961-
Writer::Async(queue) => queue.flush().await,
1037+
Writer::Async { queue, .. } => {
1038+
self.check_queue_running()?;
1039+
queue.flush().await
1040+
}
9621041
}
9631042
}
9641043

9651044
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError> {
9661045
match self {
9671046
Writer::Sync(store) => store.get(key, BLOCK_NUMBER_MAX),
968-
Writer::Async(queue) => queue.get(key),
1047+
Writer::Async { queue, .. } => queue.get(key),
9691048
}
9701049
}
9711050

@@ -975,7 +1054,7 @@ impl Writer {
9751054
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
9761055
match self {
9771056
Writer::Sync(store) => store.get_many(keys, BLOCK_NUMBER_MAX),
978-
Writer::Async(queue) => queue.get_many(keys),
1057+
Writer::Async { queue, .. } => queue.get_many(keys),
9791058
}
9801059
}
9811060

@@ -985,7 +1064,7 @@ impl Writer {
9851064
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
9861065
match self {
9871066
Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX, vec![]),
988-
Writer::Async(queue) => queue.get_derived(key),
1067+
Writer::Async { queue, .. } => queue.get_derived(key),
9891068
}
9901069
}
9911070

@@ -999,28 +1078,30 @@ impl Writer {
9991078
.load_dynamic_data_sources(BLOCK_NUMBER_MAX, manifest_idx_and_name)
10001079
.await
10011080
}
1002-
Writer::Async(queue) => queue.load_dynamic_data_sources(manifest_idx_and_name).await,
1081+
Writer::Async { queue, .. } => {
1082+
queue.load_dynamic_data_sources(manifest_idx_and_name).await
1083+
}
10031084
}
10041085
}
10051086

10061087
fn poisoned(&self) -> bool {
10071088
match self {
10081089
Writer::Sync(_) => false,
1009-
Writer::Async(queue) => queue.poisoned(),
1090+
Writer::Async { queue, .. } => queue.poisoned(),
10101091
}
10111092
}
10121093

10131094
async fn stop(&self) -> Result<(), StoreError> {
10141095
match self {
10151096
Writer::Sync(_) => Ok(()),
1016-
Writer::Async(queue) => queue.stop().await,
1097+
Writer::Async { queue, .. } => queue.stop().await,
10171098
}
10181099
}
10191100

10201101
fn deployment_synced(&self) {
10211102
match self {
10221103
Writer::Sync(_) => {}
1023-
Writer::Async(queue) => queue.deployment_synced(),
1104+
Writer::Async { queue, .. } => queue.deployment_synced(),
10241105
}
10251106
}
10261107
}

store/test-store/tests/postgres/writable.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,11 @@ async fn insert_count(store: &Arc<DieselSubgraphStore>, deployment: &DeploymentL
121121

122122
async fn pause_writer(deployment: &DeploymentLocator) {
123123
flush(deployment).await.unwrap();
124-
writable::allow_steps(0).await;
124+
writable::allow_steps(deployment, 0).await;
125125
}
126126

127127
async fn resume_writer(deployment: &DeploymentLocator, steps: usize) {
128-
writable::allow_steps(steps).await;
128+
writable::allow_steps(deployment, steps).await;
129129
flush(deployment).await.unwrap();
130130
}
131131

0 commit comments

Comments
 (0)