Skip to content

Commit 77ff8e5

Browse files
feat(*)!: set semaphore to optional, default max permits
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
1 parent 792ba3c commit 77ff8e5

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

bin/main.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,8 @@ struct Args {
7272

7373
/// (Advanced) Tweak the maximum number of jobs to run for handling events and commands. Be
7474
/// careful how you use this as it can affect performance
75-
#[arg(
76-
short = 'j',
77-
long = "max-jobs",
78-
default_value = "256",
79-
env = "WADM_MAX_JOBS"
80-
)]
81-
max_jobs: usize,
75+
#[arg(short = 'j', long = "max-jobs", env = "WADM_MAX_JOBS")]
76+
max_jobs: Option<usize>,
8277

8378
/// The URL of the nats server you want to connect to
8479
#[arg(
@@ -193,6 +188,8 @@ async fn main() -> anyhow::Result<()> {
193188

194189
let manifest_storage = nats::ensure_kv_bucket(&context, args.manifest_bucket, 1).await?;
195190

191+
debug!("Ensuring event stream");
192+
196193
let event_stream = nats::ensure_stream(
197194
&context,
198195
EVENT_STREAM_NAME.to_owned(),
@@ -204,6 +201,8 @@ async fn main() -> anyhow::Result<()> {
204201
)
205202
.await?;
206203

204+
debug!("Ensuring command stream");
205+
207206
let command_stream = nats::ensure_stream(
208207
&context,
209208
COMMAND_STREAM_NAME.to_owned(),
@@ -222,6 +221,8 @@ async fn main() -> anyhow::Result<()> {
222221
(vec![DEFAULT_EVENTS_TOPIC.to_owned()], MIRROR_STREAM_NAME)
223222
};
224223

224+
debug!("Ensuring mirror stream");
225+
225226
let mirror_stream = nats::ensure_stream(
226227
&context,
227228
mirror_stream.to_owned(),
@@ -230,14 +231,20 @@ async fn main() -> anyhow::Result<()> {
230231
)
231232
.await?;
232233

234+
debug!("Ensuring notify stream");
235+
233236
let notify_stream = nats::ensure_notify_stream(
234237
&context,
235238
NOTIFY_STREAM_NAME.to_owned(),
236239
vec![format!("{WADM_NOTIFY_PREFIX}.*")],
237240
)
238241
.await?;
239242

240-
let permit_pool = Arc::new(Semaphore::new(args.max_jobs));
243+
debug!("Creating event consumer manager");
244+
245+
let permit_pool = Arc::new(Semaphore::new(
246+
args.max_jobs.unwrap_or(Semaphore::MAX_PERMITS),
247+
));
241248
let event_worker_creator = EventWorkerCreator {
242249
state_store: state_storage.clone(),
243250
manifest_store: manifest_storage.clone(),
@@ -254,6 +261,8 @@ async fn main() -> anyhow::Result<()> {
254261
)
255262
.await;
256263

264+
debug!("Creating command consumer manager");
265+
257266
let command_worker_creator = CommandWorkerCreator {
258267
pool: connection_pool,
259268
};
@@ -278,6 +287,8 @@ async fn main() -> anyhow::Result<()> {
278287

279288
let wadm_event_prefix = DEFAULT_WADM_EVENTS_TOPIC.trim_matches(trimmer);
280289

290+
debug!("Creating lattice observer");
291+
281292
let observer = observer::Observer {
282293
parser: LatticeIdParser::new("wasmbus", args.multitenant),
283294
command_manager: commands_manager,
@@ -289,6 +300,8 @@ async fn main() -> anyhow::Result<()> {
289300
event_worker_creator,
290301
};
291302

303+
debug!("Subscribing to API topic");
304+
292305
let server = Server::new(
293306
manifest_storage,
294307
client,

0 commit comments

Comments
 (0)