Skip to content

Commit d8525e2

Browse files
committed
refactor: flatten rpc server types
1 parent 00d31d1 commit d8525e2

File tree

4 files changed

+122
-156
lines changed

4 files changed

+122
-156
lines changed

src/agent.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,12 @@ impl Agent {
175175
));
176176

177177
// Spawn the Pythd API Server
178-
jhs.push(rpc::spawn_server(
178+
jhs.push(tokio::spawn(rpc::run(
179179
self.config.pythd_api_server.clone(),
180+
logger.clone(),
180181
pythd_adapter_tx,
181182
shutdown_rx,
182-
logger.clone(),
183-
));
183+
)));
184184

185185
// Spawn the metrics server
186186
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(

src/agent/market_schedule.rs

Lines changed: 83 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@ use {
1717
Utc,
1818
},
1919
chrono_tz::Tz,
20-
proptest::{
21-
arbitrary::any,
22-
prop_compose,
23-
proptest,
24-
},
2520
std::{
2621
fmt::Display,
2722
str::FromStr,
@@ -42,7 +37,6 @@ use {
4237
},
4338
};
4439

45-
4640
/// Helper time value representing 24:00:00 as 00:00:00 minus 1
4741
/// nanosecond (underflowing to 23:59:59.999(...) ). While chrono
4842
/// has this value internally exposed as NaiveTime::MAX, it is not
@@ -51,7 +45,6 @@ const MAX_TIME_INSTANT: NaiveTime = NaiveTime::MIN
5145
.overflowing_sub_signed(Duration::nanoseconds(1))
5246
.0;
5347

54-
5548
#[derive(Clone, Debug, Eq, PartialEq)]
5649
pub struct MarketSchedule {
5750
pub timezone: Tz,
@@ -154,7 +147,6 @@ impl From<LegacySchedule> for MarketSchedule {
154147
}
155148
}
156149

157-
158150
#[derive(Clone, Debug, Eq, PartialEq)]
159151
pub struct HolidayDaySchedule {
160152
pub month: u32,
@@ -198,7 +190,6 @@ impl Display for HolidayDaySchedule {
198190
}
199191
}
200192

201-
202193
#[derive(Clone, Debug, Eq, PartialEq, Copy)]
203194
pub enum ScheduleDayKind {
204195
Open,
@@ -286,6 +277,11 @@ mod tests {
286277
use {
287278
super::*,
288279
chrono::NaiveDateTime,
280+
proptest::{
281+
arbitrary::any,
282+
prop_compose,
283+
proptest,
284+
},
289285
};
290286

291287
#[test]
@@ -478,98 +474,98 @@ mod tests {
478474

479475
Ok(())
480476
}
481-
}
482477

483-
prop_compose! {
484-
fn schedule_day_kind()(
485-
r in any::<u8>(),
486-
t1 in any::<u32>(),
487-
t2 in any::<u32>(),
488-
) -> ScheduleDayKind {
489-
match r % 3 {
490-
0 => ScheduleDayKind::Open,
491-
1 => ScheduleDayKind::Closed,
492-
_ => ScheduleDayKind::TimeRange(
493-
NaiveTime::from_hms_opt(t1 % 24, t1 / 24 % 60, 0).unwrap(),
494-
NaiveTime::from_hms_opt(t2 % 24, t2 / 24 % 60, 0).unwrap(),
495-
),
478+
prop_compose! {
479+
fn schedule_day_kind()(
480+
r in any::<u8>(),
481+
t1 in any::<u32>(),
482+
t2 in any::<u32>(),
483+
) -> ScheduleDayKind {
484+
match r % 3 {
485+
0 => ScheduleDayKind::Open,
486+
1 => ScheduleDayKind::Closed,
487+
_ => ScheduleDayKind::TimeRange(
488+
NaiveTime::from_hms_opt(t1 % 24, t1 / 24 % 60, 0).unwrap(),
489+
NaiveTime::from_hms_opt(t2 % 24, t2 / 24 % 60, 0).unwrap(),
490+
),
491+
}
496492
}
497493
}
498-
}
499494

500-
prop_compose! {
501-
fn holiday_day_schedule()(
502-
m in 1..=12u32,
503-
d in 1..=31u32,
504-
s in schedule_day_kind(),
505-
) -> HolidayDaySchedule {
506-
HolidayDaySchedule {
507-
month: m,
508-
day: d,
509-
kind: s,
495+
prop_compose! {
496+
fn holiday_day_schedule()(
497+
m in 1..=12u32,
498+
d in 1..=31u32,
499+
s in schedule_day_kind(),
500+
) -> HolidayDaySchedule {
501+
HolidayDaySchedule {
502+
month: m,
503+
day: d,
504+
kind: s,
505+
}
510506
}
511507
}
512-
}
513508

514-
prop_compose! {
515-
fn market_schedule()(
516-
tz in proptest::sample::select(vec![
517-
Tz::UTC,
518-
Tz::America__New_York,
519-
Tz::America__Los_Angeles,
520-
Tz::America__Chicago,
521-
Tz::Singapore,
522-
Tz::Australia__Sydney,
523-
]),
524-
weekly_schedule in proptest::collection::vec(schedule_day_kind(), 7..=7),
525-
holidays in proptest::collection::vec(holiday_day_schedule(), 0..12),
526-
) -> MarketSchedule {
527-
MarketSchedule {
528-
timezone: tz,
529-
weekly_schedule,
530-
holidays,
509+
prop_compose! {
510+
fn market_schedule()(
511+
tz in proptest::sample::select(vec![
512+
Tz::UTC,
513+
Tz::America__New_York,
514+
Tz::America__Los_Angeles,
515+
Tz::America__Chicago,
516+
Tz::Singapore,
517+
Tz::Australia__Sydney,
518+
]),
519+
weekly_schedule in proptest::collection::vec(schedule_day_kind(), 7..=7),
520+
holidays in proptest::collection::vec(holiday_day_schedule(), 0..12),
521+
) -> MarketSchedule {
522+
MarketSchedule {
523+
timezone: tz,
524+
weekly_schedule,
525+
holidays,
526+
}
531527
}
532528
}
533-
}
534529

535-
// Matches C or O or hhmm-hhmm with 24-hour time
536-
const VALID_SCHEDULE_DAY_KIND_REGEX: &str =
537-
"C|O|([01][1-9]|2[0-3])([0-5][0-9])-([01][1-9]|2[0-3])([0-5][0-9])";
530+
// Matches C or O or hhmm-hhmm with 24-hour time
531+
const VALID_SCHEDULE_DAY_KIND_REGEX: &str =
532+
"C|O|([01][1-9]|2[0-3])([0-5][0-9])-([01][1-9]|2[0-3])([0-5][0-9])";
538533

539-
// Matches MMDD with MM and DD being 01-12 and 01-31 respectively
540-
const VALID_MONTH_DAY_REGEX: &str = "(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])";
534+
// Matches MMDD with MM and DD being 01-12 and 01-31 respectively
535+
const VALID_MONTH_DAY_REGEX: &str = "(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])";
541536

542-
proptest!(
543-
#[test]
544-
fn doesnt_crash(s in "\\PC*") {
545-
_ = s.parse::<MarketSchedule>();
546-
_ = s.parse::<HolidayDaySchedule>();
547-
_ = s.parse::<ScheduleDayKind>();
548-
}
537+
proptest!(
538+
#[test]
539+
fn doesnt_crash(s in "\\PC*") {
540+
_ = s.parse::<MarketSchedule>();
541+
_ = s.parse::<HolidayDaySchedule>();
542+
_ = s.parse::<ScheduleDayKind>();
543+
}
549544

550-
#[test]
551-
fn parse_valid_schedule_day_kind(s in VALID_SCHEDULE_DAY_KIND_REGEX) {
552-
assert!(s.parse::<ScheduleDayKind>().is_ok());
553-
}
545+
#[test]
546+
fn parse_valid_schedule_day_kind(s in VALID_SCHEDULE_DAY_KIND_REGEX) {
547+
assert!(s.parse::<ScheduleDayKind>().is_ok());
548+
}
554549

555-
#[test]
556-
fn test_valid_schedule_day_kind(s in schedule_day_kind()) {
557-
assert_eq!(s, s.to_string().parse::<ScheduleDayKind>().unwrap());
558-
}
550+
#[test]
551+
fn test_valid_schedule_day_kind(s in schedule_day_kind()) {
552+
assert_eq!(s, s.to_string().parse::<ScheduleDayKind>().unwrap());
553+
}
559554

560-
#[test]
561-
fn parse_valid_holiday_day_schedule(s in VALID_SCHEDULE_DAY_KIND_REGEX, d in VALID_MONTH_DAY_REGEX) {
562-
let valid_holiday_day = format!("{}/{}", d, s);
563-
assert!(valid_holiday_day.parse::<HolidayDaySchedule>().is_ok());
564-
}
555+
#[test]
556+
fn parse_valid_holiday_day_schedule(s in VALID_SCHEDULE_DAY_KIND_REGEX, d in VALID_MONTH_DAY_REGEX) {
557+
let valid_holiday_day = format!("{}/{}", d, s);
558+
assert!(valid_holiday_day.parse::<HolidayDaySchedule>().is_ok());
559+
}
565560

566-
#[test]
567-
fn test_valid_holiday_day_schedule(s in holiday_day_schedule()) {
568-
assert_eq!(s, s.to_string().parse::<HolidayDaySchedule>().unwrap());
569-
}
561+
#[test]
562+
fn test_valid_holiday_day_schedule(s in holiday_day_schedule()) {
563+
assert_eq!(s, s.to_string().parse::<HolidayDaySchedule>().unwrap());
564+
}
570565

571-
#[test]
572-
fn test_valid_market_schedule(s in market_schedule()) {
573-
assert_eq!(s, s.to_string().parse::<MarketSchedule>().unwrap());
574-
}
575-
);
566+
#[test]
567+
fn test_valid_market_schedule(s in market_schedule()) {
568+
assert_eq!(s, s.to_string().parse::<MarketSchedule>().unwrap());
569+
}
570+
);
571+
}

src/agent/pythd/api.rs

Lines changed: 35 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,10 @@ pub mod rpc {
136136
fmt::Debug,
137137
net::SocketAddr,
138138
},
139-
tokio::{
140-
sync::{
141-
broadcast,
142-
mpsc,
143-
oneshot,
144-
},
145-
task::JoinHandle,
139+
tokio::sync::{
140+
broadcast,
141+
mpsc,
142+
oneshot,
146143
},
147144
warp::{
148145
ws::{
@@ -584,53 +581,33 @@ pub mod rpc {
584581
}
585582
}
586583

587-
pub fn spawn_server(
584+
pub async fn run(
588585
config: Config,
586+
logger: Logger,
589587
adapter_tx: mpsc::Sender<adapter::Message>,
590588
shutdown_rx: broadcast::Receiver<()>,
591-
logger: Logger,
592-
) -> JoinHandle<()> {
593-
tokio::spawn(async move {
594-
Server::new(adapter_tx, config, logger)
595-
.run(shutdown_rx)
596-
.await
597-
})
589+
) {
590+
if let Err(err) = serve(config, &logger, adapter_tx, shutdown_rx).await {
591+
error!(logger, "{}", err);
592+
debug!(logger, "error context"; "context" => format!("{:?}", err));
593+
}
598594
}
599595

600-
pub struct Server {
596+
async fn serve(
597+
config: Config,
598+
logger: &Logger,
601599
adapter_tx: mpsc::Sender<adapter::Message>,
602-
config: Config,
603-
logger: Logger,
604-
}
605-
606-
impl Server {
607-
pub fn new(
608-
adapter_tx: mpsc::Sender<adapter::Message>,
609-
config: Config,
610-
logger: Logger,
611-
) -> Self {
612-
Server {
613-
adapter_tx,
614-
config,
615-
logger,
616-
}
617-
}
618-
619-
pub async fn run(&self, shutdown_rx: broadcast::Receiver<()>) {
620-
if let Err(err) = self.serve(shutdown_rx).await {
621-
error!(self.logger, "{}", err);
622-
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
623-
}
624-
}
625-
626-
async fn serve(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
627-
let adapter_tx = self.adapter_tx.clone();
628-
let config = self.config.clone();
629-
let with_logger = WithLogger {
630-
logger: self.logger.clone(),
631-
};
600+
mut shutdown_rx: broadcast::Receiver<()>,
601+
) -> Result<()> {
602+
let adapter_tx = adapter_tx.clone();
603+
let config = config.clone();
604+
let with_logger = WithLogger {
605+
logger: logger.clone(),
606+
};
632607

633-
let index = warp::path::end()
608+
let index = {
609+
let config = config.clone();
610+
warp::path::end()
634611
.and(warp::ws())
635612
.and(warp::any().map(move || adapter_tx.clone()))
636613
.and(warp::any().map(move || with_logger.clone()))
@@ -654,19 +631,19 @@ pub mod rpc {
654631
.await
655632
})
656633
},
657-
);
634+
)
635+
};
658636

659-
let (_, serve) = warp::serve(index).bind_with_graceful_shutdown(
660-
self.config.listen_address.as_str().parse::<SocketAddr>()?,
661-
async move {
662-
let _ = shutdown_rx.recv().await;
663-
},
664-
);
637+
let (_, serve) = warp::serve(index).bind_with_graceful_shutdown(
638+
config.listen_address.as_str().parse::<SocketAddr>()?,
639+
async move {
640+
let _ = shutdown_rx.recv().await;
641+
},
642+
);
665643

666-
info!(self.logger, "starting api server"; "listen address" => self.config.listen_address.clone());
644+
info!(logger, "starting api server"; "listen address" => config.listen_address.clone());
667645

668-
tokio::task::spawn(serve).await.map_err(|e| e.into())
669-
}
646+
tokio::task::spawn(serve).await.map_err(|e| e.into())
670647
}
671648

672649
#[cfg(test)]
@@ -685,7 +662,6 @@ pub mod rpc {
685662
SubscriptionID,
686663
},
687664
Config,
688-
Server,
689665
},
690666
crate::agent::pythd::{
691667
adapter,
@@ -819,10 +795,7 @@ pub mod rpc {
819795
listen_address: format!("127.0.0.1:{:}", listen_port),
820796
..Default::default()
821797
};
822-
let server = Server::new(adapter_tx, config, logger);
823-
let jh = tokio::spawn(async move {
824-
server.run(shutdown_rx).await;
825-
});
798+
let jh = tokio::spawn(super::run(config, logger, adapter_tx, shutdown_rx));
826799
let test_server = TestServer { shutdown_tx, jh };
827800

828801
// Create a test client to interact with the server

0 commit comments

Comments
 (0)