Skip to content

Commit 74ac0b2

Browse files
committed
feat: e2e update devices on failed test
1 parent daa9664 commit 74ac0b2

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

src/e2e/main.rs

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ mod structs;
2222
pub static UNIX_SENDER: OnceCell<tokio::sync::mpsc::UnboundedSender<UnixResponse>> =
2323
OnceCell::const_new();
2424

25+
pub static SEND_DEVICES: OnceCell<tokio::sync::mpsc::UnboundedSender<()>> = OnceCell::const_new();
26+
2527
#[tokio::main]
2628
async fn main() -> Result<()> {
2729
_ = dotenvy::dotenv();
@@ -38,19 +40,22 @@ async fn main() -> Result<()> {
3840
let tests_root: TestsRoot = serde_json::from_slice(&tests_root)?;
3941

4042
let mut state = State {
41-
devices: vec![],
43+
devices: Arc::new(RwLock::new(vec![])),
4244
senders: Arc::new(RwLock::new(HashMap::new())),
4345
tests: Arc::new(RwLock::new(tests_root)),
4446
};
4547

4648
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
4749
UNIX_SENDER.set(tx)?;
4850

51+
let (tx, mut send_dev_rx) = tokio::sync::mpsc::unbounded_channel();
52+
SEND_DEVICES.set(tx)?;
53+
4954
let listener = UnixListener::bind(&socket_path)?;
5055
tracing::info!("Unix listener started on path {socket_path}!");
5156
loop {
5257
let (mut stream, _) = listener.accept().await?;
53-
let res = handle_stream(&mut stream, &mut state, &mut rx).await;
58+
let res = handle_stream(&mut stream, &mut state, &mut rx, &mut send_dev_rx).await;
5459
if let Err(e) = res {
5560
tracing::error!("Handle stream error: {e:?}");
5661
}
@@ -61,8 +66,11 @@ async fn handle_stream(
6166
stream: &mut UnixStream,
6267
state: &mut State,
6368
rx: &mut UnboundedReceiver<UnixResponse>,
69+
send_dev_rx: &mut UnboundedReceiver<()>,
6470
) -> Result<()> {
65-
send_status_resp(stream, &state.devices).await?;
71+
{
72+
send_status_resp(stream, &state.devices.read().await.to_vec()).await?;
73+
}
6674

6775
let mut buf = Vec::with_capacity(512);
6876
loop {
@@ -73,12 +81,17 @@ async fn handle_stream(
7381

7482
match packet.data {
7583
UnixRequestData::RequestToConnectDevice { esp_id, .. } => {
76-
state.devices.push(esp_id);
77-
send_status_resp(stream, &state.devices).await?;
84+
{
85+
let mut devices = state.devices.write().await;
86+
devices.push(esp_id);
87+
88+
send_status_resp(stream, &devices.to_vec()).await?;
89+
}
90+
7891
send_resp(stream, UnixResponseData::Empty, packet.tag, false).await?;
7992

8093
let tests = state.tests.read().await.clone();
81-
new_test_sender(&esp_id, state.senders.clone(), tests).await?;
94+
new_test_sender(&esp_id, state.devices.clone(), state.senders.clone(), tests).await?;
8295
}
8396
UnixRequestData::PersonInfo { ref card_id } => {
8497
let card_id: u64 = card_id.parse()?;
@@ -134,17 +147,25 @@ async fn handle_stream(
134147
Some(recv) = rx.recv() => {
135148
send_raw_resp(stream, recv).await?;
136149
}
150+
_ = send_dev_rx.recv() => {
151+
send_status_resp(stream, &state.devices.read().await.to_vec()).await?;
152+
}
137153
}
138154
}
139155
}
140156

141-
async fn new_test_sender(esp_id: &u32, senders: SharedSenders, tests: TestsRoot) -> Result<()> {
157+
async fn new_test_sender(
158+
esp_id: &u32,
159+
devices: Arc<RwLock<Vec<u32>>>,
160+
senders: SharedSenders,
161+
tests: TestsRoot,
162+
) -> Result<()> {
142163
let esp_id = *esp_id;
143164

144165
tokio::task::spawn(async move {
145166
tracing::info!("Starting new test sender for esp with id: {esp_id}");
146167

147-
let res = test_sender(esp_id, senders, tests).await;
168+
let res = test_sender(esp_id, devices, senders, tests).await;
148169
if let Err(e) = res {
149170
tracing::error!("Test sender error: {e:?}");
150171
}
@@ -153,7 +174,12 @@ async fn new_test_sender(esp_id: &u32, senders: SharedSenders, tests: TestsRoot)
153174
Ok(())
154175
}
155176

156-
async fn test_sender(esp_id: u32, senders: SharedSenders, tests: TestsRoot) -> Result<()> {
177+
async fn test_sender(
178+
esp_id: u32,
179+
devices: Arc<RwLock<Vec<u32>>>,
180+
senders: SharedSenders,
181+
tests: TestsRoot,
182+
) -> Result<()> {
157183
let unix_tx = UNIX_SENDER.get().expect("UNIX_SENDER not set!");
158184
let mut rx = spawn_new_sender(&senders, esp_id).await?;
159185

@@ -174,6 +200,21 @@ async fn test_sender(esp_id: u32, senders: SharedSenders, tests: TestsRoot) -> R
174200
let res = run_test(&unix_tx, &mut rx, esp_id, &tests, next_idx, &mut last_time).await;
175201
if let Err(e) = res {
176202
tracing::error!("Run test error: {e:?}");
203+
{
204+
let mut dev = devices.write().await;
205+
let index = dev
206+
.iter()
207+
.enumerate()
208+
.find(|(_, e)| **e == esp_id)
209+
.map(|(i, _)| i);
210+
211+
if let Some(index) = index {
212+
dev.remove(index);
213+
}
214+
215+
_ = SEND_DEVICES.get().unwrap().send(());
216+
}
217+
177218
break Ok(());
178219
}
179220

src/e2e/structs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use unix_utils::request::UnixRequestData;
55

66
pub type SharedSenders = Arc<RwLock<HashMap<u32, UnboundedSender<UnixRequestData>>>>;
77
pub struct State {
8-
pub devices: Vec<u32>,
8+
pub devices: Arc<RwLock<Vec<u32>>>,
99
pub senders: SharedSenders,
1010
pub tests: Arc<RwLock<TestsRoot>>,
1111
}

0 commit comments

Comments
 (0)