Skip to content

Commit febf5a0

Browse files
committed
Add more control config
Signed-off-by: koushiro <koushiro.cqx@gmail.com>
1 parent a3eaa11 commit febf5a0

File tree

6 files changed

+125
-50
lines changed

6 files changed

+125
-50
lines changed

bin/node-template-archive/archive.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,25 @@ std = "DEBUG"
4444
#name = "archive.log"
4545

4646
[control]
47+
# Number of database actors to be spawned.
48+
# Optional, default: 4
49+
db_actor_pool_size = 4
50+
4751
# Number of threads to dedicate for executing tasks
4852
# Optional, default: the number of logical system threads
4953
task_workers = 8
5054

55+
# Timeout to wait for a task to start execution.
56+
# Optional, default: 20 seconds
57+
task_timeout = 20
58+
59+
# Maximum number of tasks to queue in the threadpool.
60+
# Optional, default: 64
61+
max_tasks = 64
62+
5163
# Maximium number of blocks to load and insert into database at a time.
5264
# Useful for controlling memory usage.
53-
# Optional, defaults: 100,000.
65+
# Optional, defaults: 100,000
5466
max_block_load = 100000
5567

5668
[wasm_tracing]

bin/polkadot-archive/archive.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,25 @@ std = "DEBUG"
5151
#name = "archive.log"
5252

5353
[control]
54+
# Number of database actors to be spawned.
55+
# Optional, default: 4
56+
db_actor_pool_size = 4
57+
5458
# Number of threads to dedicate for executing tasks
5559
# Optional, default: the number of logical system threads
5660
task_workers = 8
5761

62+
# Timeout to wait for a task to start execution.
63+
# Optional, default: 20 seconds
64+
task_timeout = 20
65+
66+
# Maximum number of tasks to queue in the threadpool.
67+
# Optional, default: 64
68+
max_tasks = 64
69+
5870
# Maximium number of blocks to load and insert into database at a time.
5971
# Useful for controlling memory usage.
60-
# Optional, defaults: 100,000.
72+
# Optional, defaults: 100,000
6173
max_block_load = 100000
6274

6375
[wasm_tracing]

substrate-archive/src/actors.rs

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::{marker::PhantomData, panic::AssertUnwindSafe, sync::Arc, time::Duratio
2424
use coil::Job as _;
2525
use futures::{future::BoxFuture, FutureExt};
2626
use hashbrown::HashSet;
27-
use serde::de::DeserializeOwned;
27+
use serde::{de::DeserializeOwned, Deserialize};
2828
use xtra::{prelude::*, spawn::Smol, Disconnected};
2929

3030
use sc_client_api::backend;
@@ -60,8 +60,7 @@ where
6060
pub backend: Arc<ReadOnlyBackend<B, D>>,
6161
pub pg_url: String,
6262
pub meta: Meta<B>,
63-
pub workers: usize,
64-
pub max_block_load: u32,
63+
pub control: ControlConfig,
6564
pub tracing_targets: Option<String>,
6665
}
6766

@@ -74,13 +73,64 @@ where
7473
backend: Arc::clone(&self.backend),
7574
pg_url: self.pg_url.clone(),
7675
meta: self.meta.clone(),
77-
workers: self.workers,
78-
max_block_load: self.max_block_load,
76+
control: self.control,
7977
tracing_targets: self.tracing_targets.clone(),
8078
}
8179
}
8280
}
8381

82+
#[derive(Copy, Clone, Debug, Deserialize)]
83+
pub struct ControlConfig {
84+
/// number of database actors to be spawned in the actor pool.
85+
#[serde(default = "default_db_actor_pool_size")]
86+
pub(crate) db_actor_pool_size: usize,
87+
/// number of threads to spawn for task execution.
88+
#[serde(default = "default_task_workers")]
89+
pub(crate) task_workers: usize,
90+
/// maximum amount of time coil will wait for a task to begin.
91+
/// times out if tasks don't start execution in the threadpool within `task_timeout` seconds.
92+
#[serde(default = "default_task_timeout")]
93+
pub(crate) task_timeout: u64,
94+
/// maximum tasks to queue in the threadpool.
95+
#[serde(default = "default_task_workers")]
96+
pub(crate) max_tasks: usize,
97+
/// maximum amount of blocks to index at once
98+
#[serde(default = "default_max_block_load")]
99+
pub(crate) max_block_load: u32,
100+
}
101+
102+
impl Default for ControlConfig {
103+
fn default() -> Self {
104+
Self {
105+
db_actor_pool_size: default_db_actor_pool_size(),
106+
task_workers: default_task_workers(),
107+
task_timeout: default_task_timeout(),
108+
max_tasks: default_max_tasks(),
109+
max_block_load: default_max_block_load(),
110+
}
111+
}
112+
}
113+
114+
const fn default_db_actor_pool_size() -> usize {
115+
4
116+
}
117+
118+
fn default_task_workers() -> usize {
119+
num_cpus::get()
120+
}
121+
122+
const fn default_task_timeout() -> u64 {
123+
20
124+
}
125+
126+
const fn default_max_tasks() -> usize {
127+
64
128+
}
129+
130+
const fn default_max_block_load() -> u32 {
131+
100_000
132+
}
133+
84134
impl<B: BlockT + Unpin, D: ReadOnlyDB> SystemConfig<B, D>
85135
where
86136
B::Hash: Unpin,
@@ -89,11 +139,10 @@ where
89139
backend: Arc<ReadOnlyBackend<B, D>>,
90140
pg_url: String,
91141
meta: Meta<B>,
92-
workers: usize,
93-
max_block_load: u32,
142+
control: ControlConfig,
94143
tracing_targets: Option<String>,
95144
) -> Self {
96-
Self { backend, pg_url, meta, workers, max_block_load, tracing_targets }
145+
Self { backend, pg_url, meta, control, tracing_targets }
97146
}
98147

99148
pub fn backend(&self) -> &Arc<ReadOnlyBackend<B, D>> {
@@ -207,10 +256,10 @@ where
207256

208257
let runner = coil::Runner::builder(env, TaskExecutor, &pool)
209258
.register_job::<crate::tasks::execute_block::Job<B, R, C, D>>()
210-
.num_threads(conf.workers)
259+
.num_threads(conf.control.task_workers)
211260
// times out if tasks don't start execution on the threadpool within 20 seconds.
212-
.timeout(Duration::from_secs(20))
213-
.max_tasks(64)
261+
.timeout(Duration::from_secs(conf.control.task_timeout))
262+
.max_tasks(conf.control.max_tasks)
214263
.build()?;
215264

216265
loop {
@@ -237,7 +286,8 @@ where
237286

238287
async fn spawn_actors(conf: SystemConfig<B, D>) -> Result<Actors<B, D>> {
239288
let db = workers::DatabaseActor::<B>::new(conf.pg_url().into()).await?;
240-
let db_pool = actor_pool::ActorPool::new(db, 4).create(None).spawn(&mut Smol::Global);
289+
let db_pool =
290+
actor_pool::ActorPool::new(db, conf.control.db_actor_pool_size).create(None).spawn(&mut Smol::Global);
241291
let storage = workers::StorageAggregator::new(db_pool.clone()).create(None).spawn(&mut Smol::Global);
242292
let metadata = workers::MetadataActor::new(db_pool.clone(), conf.meta().clone())
243293
.await?
@@ -345,7 +395,7 @@ where
345395
Ok(())
346396
}
347397

348-
fn context(&self) -> Result<super::actors::SystemConfig<B, D>> {
349-
Ok(self.config.clone())
398+
fn context(&self) -> &SystemConfig<B, D> {
399+
&self.config
350400
}
351401
}

substrate-archive/src/actors/workers/blocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ where
7070
backend: conf.backend().clone(),
7171
db,
7272
meta,
73-
max_block_load: conf.max_block_load,
73+
max_block_load: conf.control.max_block_load,
7474
}
7575
}
7676

substrate-archive/src/archive.rs

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use substrate_archive_backend::{
3434
};
3535

3636
use crate::{
37-
actors::{System, SystemConfig},
37+
actors::{ControlConfig, System, SystemConfig},
3838
database::{self, DatabaseConfig},
3939
error::Result,
4040
logger::{self, FileLoggerConfig, LoggerConfig},
@@ -88,31 +88,6 @@ pub struct TracingConfig {
8888
pub folder: Option<PathBuf>,
8989
}
9090

91-
#[derive(Clone, Debug, Deserialize)]
92-
pub struct ControlConfig {
93-
/// number of threads to spawn for task execution.
94-
#[serde(default = "default_task_workers")]
95-
pub(crate) task_workers: usize,
96-
/// maximum amount of blocks to index at once
97-
#[serde(default = "default_max_block_load")]
98-
pub(crate) max_block_load: u32,
99-
}
100-
101-
impl Default for ControlConfig {
102-
fn default() -> Self {
103-
Self { task_workers: default_task_workers(), max_block_load: default_max_block_load() }
104-
}
105-
}
106-
107-
fn default_task_workers() -> usize {
108-
num_cpus::get()
109-
}
110-
111-
// 100_000 blocks to index at once
112-
const fn default_max_block_load() -> u32 {
113-
100_000
114-
}
115-
11691
#[derive(Clone, Debug, Default, Deserialize)]
11792
pub struct ArchiveConfig {
11893
/// chain spec and database
@@ -144,7 +119,7 @@ where
144119
fn drive(&mut self) -> Result<()>;
145120

146121
/// this method will block indefinitely
147-
async fn block_until_stopped(&self) -> ();
122+
async fn block_until_stopped(&self);
148123

149124
/// shutdown the system
150125
fn shutdown(self) -> Result<()>;
@@ -153,7 +128,7 @@ where
153128
fn boxed_shutdown(self: Box<Self>) -> Result<()>;
154129

155130
/// Get a reference to the context the actors are using
156-
fn context(&self) -> Result<SystemConfig<B, D>>;
131+
fn context(&self) -> &SystemConfig<B, D>;
157132
}
158133

159134
pub struct ArchiveBuilder<B, R, D, DB> {
@@ -241,7 +216,7 @@ impl<B, R, D, DB> ArchiveBuilder<B, R, D, DB> {
241216
self
242217
}
243218

244-
/// Number of 64KB Heap Pages to allocate for WASM execution
219+
/// Set the number of 64KB Heap Pages to allocate for WASM execution
245220
///
246221
/// # Default
247222
/// defaults to 64 * (number of logic cpu's)
@@ -250,6 +225,15 @@ impl<B, R, D, DB> ArchiveBuilder<B, R, D, DB> {
250225
self
251226
}
252227

228+
/// Set the number of database actors to be spawned in the actor pool.
229+
///
230+
/// # Default
231+
/// defaults to 4
232+
pub fn db_actor_pool_size(mut self, size: usize) -> Self {
233+
self.config.control.db_actor_pool_size = size;
234+
self
235+
}
236+
253237
/// Set the number of threads spawn for task execution.
254238
///
255239
/// # Default
@@ -259,6 +243,24 @@ impl<B, R, D, DB> ArchiveBuilder<B, R, D, DB> {
259243
self
260244
}
261245

246+
/// Set the timeout to wait for a task to start execution.
247+
///
248+
/// # Default
249+
/// defaults to 20 seconds
250+
pub fn task_timeout(mut self, timeout: u64) -> Self {
251+
self.config.control.task_timeout = timeout;
252+
self
253+
}
254+
255+
/// Set the maximum tasks to queue in the threadpool.
256+
///
257+
/// # Default
258+
/// defaults to 64
259+
pub fn max_tasks(mut self, max: usize) -> Self {
260+
self.config.control.max_tasks = max;
261+
self
262+
}
263+
262264
/// Set the number of blocks to index at once
263265
///
264266
/// # Default
@@ -392,8 +394,7 @@ where
392394
backend,
393395
pg_url,
394396
client.clone(),
395-
self.config.control.task_workers,
396-
self.config.control.max_block_load,
397+
self.config.control,
397398
self.config.wasm_tracing.map(|t| t.targets),
398399
);
399400
let sys = System::<_, R, _, _>::new(client, config)?;

substrate-archive/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ mod tasks;
3232
mod types;
3333
mod wasm_tracing;
3434

35-
pub use self::actors::System;
36-
pub use self::archive::{Archive, ArchiveBuilder, ArchiveConfig, ChainConfig, ControlConfig, TracingConfig};
35+
pub use self::actors::{ControlConfig, System};
36+
pub use self::archive::{Archive, ArchiveBuilder, ArchiveConfig, ChainConfig, TracingConfig};
3737
pub use self::database::{queries, DatabaseConfig};
3838
pub use self::error::ArchiveError;
3939

0 commit comments

Comments
 (0)