Skip to content

Commit 2097285

Browse files
authored
Merge pull request #493 from anmenaga/issue_491
Async resource process invocation
2 parents 6483333 + 4760b7e commit 2097285

File tree

5 files changed

+144
-72
lines changed

5 files changed

+144
-72
lines changed

dsc/tests/dsc.exit_code.tests.ps1

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
Describe 'exit code tests' {
55
It 'non-zero exit code in manifest has corresponding message' {
6-
$result = dsc resource get -r Test/ExitCode --input "{ exitCode: 8 }" 2>&1
7-
$result | Should -Match 'ERROR.*?[Exit code 8].*?manifest description: Placeholder from manifest for exit code 8'
6+
dsc resource get -r Test/ExitCode --input "{ exitCode: 8 }" 2> $TestDrive/tracing.txt
7+
"$TestDrive/tracing.txt" | Should -FileContentMatchExactly 'Placeholder from manifest for exit code 8'
88
}
99
It 'non-zero exit code not in manifest has generic message' {
10-
$result = dsc resource get -r Test/ExitCode --input "{ exitCode: 1 }" 2>&1
11-
$result | Should -Match 'ERROR.*?Error.*?[Exit code 1]'
10+
dsc resource get -r Test/ExitCode --input "{ exitCode: 1 }" 2> $TestDrive/tracing.txt
11+
"$TestDrive/tracing.txt" | Should -FileContentMatchExactly 'Exit code 1'
1212
}
1313
It 'success exit code executes without error' {
1414
$result = dsc resource get -r Test/ExitCode --input "{ exitCode: 0 }" | ConvertFrom-Json

dsc_lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ serde_yaml = { version = "0.9.3" }
1818
thiserror = "1.0"
1919
security_context_lib = { path = "../security_context_lib" }
2020
semver = "1.0"
21+
tokio = { version = "1.38.1", features = ["full"] }
2122
tracing = "0.1.37"
2223
tracing-indicatif = { version = "0.3.6" }
2324
tree-sitter = "0.22"

dsc_lib/src/dscresources/command_resource.rs

Lines changed: 123 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33

44
use jsonschema::JSONSchema;
55
use serde_json::Value;
6-
use std::{collections::HashMap, env, io::{Read, Write}, process::{Command, Stdio}};
6+
use std::{collections::HashMap, env, process::Stdio};
77
use crate::{configure::{config_doc::ExecutionKind, {config_result::ResourceGetResult, parameters, Configurator}}, util::parse_input_to_json};
88
use crate::dscerror::DscError;
99
use super::{dscresource::get_diff, invoke_result::{ExportResult, GetResult, ResolveResult, SetResult, TestResult, ValidateResult, ResourceGetResponse, ResourceSetResponse, ResourceTestResponse, get_in_desired_state}, resource_manifest::{ArgKind, InputKind, Kind, ResourceManifest, ReturnKind, SchemaKind}};
1010
use tracing::{error, warn, info, debug, trace};
11+
use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::Command};
1112

1213
pub const EXIT_PROCESS_TERMINATED: i32 = 0x102;
1314

@@ -553,21 +554,27 @@ pub fn invoke_resolve(resource: &ResourceManifest, cwd: &str, input: &str) -> Re
553554
Ok(result)
554555
}
555556

556-
/// Invoke a command and return the exit code, stdout, and stderr.
557+
/// Asynchronously invoke a command and return the exit code, stdout, and stderr.
557558
///
558559
/// # Arguments
559560
///
560561
/// * `executable` - The command to execute
561562
/// * `args` - Optional arguments to pass to the command
562563
/// * `input` - Optional input to pass to the command
563564
/// * `cwd` - Optional working directory to execute the command in
565+
/// * `env` - Optional environment variable mappings to add or update
566+
/// * `exit_codes` - Optional descriptions of exit codes
564567
///
565568
/// # Errors
566569
///
567570
/// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened.
568-
#[allow(clippy::implicit_hasher)]
569-
pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
570-
debug!("Invoking command '{}' with args {:?}", executable, args);
571+
///
572+
async fn run_process_async(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
573+
574+
// use somewhat large initial buffer to avoid early string reallocations;
575+
// the value is based on list result of largest of built-in adapters - WMI adapter ~500KB
576+
const INITIAL_BUFFER_CAPACITY: usize = 1024*1024;
577+
571578
let mut command = Command::new(executable);
572579
if input.is_some() {
573580
command.stdin(Stdio::piped());
@@ -583,62 +590,110 @@ pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option
583590
if let Some(env) = env {
584591
command.envs(env);
585592
}
586-
587593
if executable == "dsc" && env::var("DEBUG_DSC").is_ok() {
588594
// remove this env var from child process as it will fail reading from keyboard to allow attaching
589595
command.env_remove("DEBUG_DSC");
590596
}
591597

592-
let mut child = command.spawn()?;
598+
let mut child = match command.spawn() {
599+
Ok(c) => c,
600+
Err(e) => {
601+
return Err(DscError::CommandOperation(e.to_string(), executable.to_string()))
602+
}
603+
};
604+
605+
let stdout = child.stdout.take().expect("child did not have a handle to stdout");
606+
let stderr = child.stderr.take().expect("child did not have a handle to stderr");
607+
let mut stdout_reader = BufReader::new(stdout).lines();
608+
let mut stderr_reader = BufReader::new(stderr).lines();
609+
593610
if let Some(input) = input {
594611
trace!("Writing to command STDIN: {input}");
595-
// pipe to child stdin in a scope so that it is dropped before we wait
596-
// otherwise the pipe isn't closed and the child process waits forever
597-
let Some(mut child_stdin) = child.stdin.take() else {
598-
return Err(DscError::CommandOperation("Failed to open stdin".to_string(), executable.to_string()));
599-
};
600-
child_stdin.write_all(input.as_bytes())?;
601-
child_stdin.flush()?;
612+
let mut stdin = child.stdin.take().expect("child did not have a handle to stdin");
613+
stdin.write_all(input.as_bytes()).await.expect("could not write to stdin");
614+
drop(stdin);
602615
}
603616

604-
let Some(mut child_stdout) = child.stdout.take() else {
605-
return Err(DscError::CommandOperation("Failed to open stdout".to_string(), executable.to_string()));
617+
let Some(child_id) = child.id() else {
618+
return Err(DscError::CommandOperation("Can't get child process id".to_string(), executable.to_string()));
606619
};
607-
let mut stdout_buf = Vec::new();
608-
child_stdout.read_to_end(&mut stdout_buf)?;
609620

610-
let Some(mut child_stderr) = child.stderr.take() else {
611-
return Err(DscError::CommandOperation("Failed to open stderr".to_string(), executable.to_string()));
612-
};
613-
let mut stderr_buf = Vec::new();
614-
child_stderr.read_to_end(&mut stderr_buf)?;
615-
616-
let exit_status = child.wait()?;
617-
let exit_code = exit_status.code().unwrap_or(EXIT_PROCESS_TERMINATED);
618-
let stdout = String::from_utf8_lossy(&stdout_buf).to_string();
619-
let stderr = String::from_utf8_lossy(&stderr_buf).to_string();
620-
if !stdout.is_empty() {
621-
trace!("STDOUT returned: {}", &stdout);
622-
}
623-
let cleaned_stderr = if stderr.is_empty() {
624-
stderr
625-
} else {
626-
trace!("STDERR returned data to be traced");
627-
log_resource_traces(executable, &child.id(), &stderr);
628-
// TODO: remove logged traces from STDERR
629-
String::new()
630-
};
621+
let child_task = tokio::spawn(async move {
622+
child.wait().await
623+
});
631624

632-
if exit_code != 0 {
633-
if let Some(exit_codes) = exit_codes {
634-
if let Some(error_message) = exit_codes.get(&exit_code) {
635-
return Err(DscError::CommandExitFromManifest(executable.to_string(), exit_code, error_message.to_string()));
625+
let stdout_task = tokio::spawn(async move {
626+
let mut stdout_result = String::with_capacity(INITIAL_BUFFER_CAPACITY);
627+
while let Ok(Some(line)) = stdout_reader.next_line().await {
628+
stdout_result.push_str(&line);
629+
stdout_result.push('\n');
630+
}
631+
stdout_result
632+
});
633+
634+
let stderr_task = tokio::spawn(async move {
635+
let mut filtered_stderr = String::with_capacity(INITIAL_BUFFER_CAPACITY);
636+
while let Ok(Some(stderr_line)) = stderr_reader.next_line().await {
637+
let filtered_stderr_line = log_stderr_line(&child_id, &stderr_line);
638+
if !filtered_stderr_line.is_empty() {
639+
filtered_stderr.push_str(filtered_stderr_line);
640+
filtered_stderr.push('\n');
641+
}
642+
}
643+
filtered_stderr
644+
});
645+
646+
let exit_code = child_task.await.unwrap()?.code();
647+
let stdout_result = stdout_task.await.unwrap();
648+
let stderr_result = stderr_task.await.unwrap();
649+
650+
if let Some(code) = exit_code {
651+
debug!("Process '{executable}' id {child_id} exited with code {code}");
652+
653+
if code != 0 {
654+
if let Some(exit_codes) = exit_codes {
655+
if let Some(error_message) = exit_codes.get(&code) {
656+
return Err(DscError::CommandExitFromManifest(executable.to_string(), code, error_message.to_string()));
657+
}
636658
}
659+
return Err(DscError::Command(executable.to_string(), code, stderr_result));
637660
}
638-
return Err(DscError::Command(executable.to_string(), exit_code, cleaned_stderr));
661+
662+
Ok((code, stdout_result, stderr_result))
663+
} else {
664+
debug!("Process '{executable}' id {child_id} terminated by signal");
665+
Err(DscError::CommandOperation("Process terminated by signal".to_string(), executable.to_string()))
639666
}
667+
}
668+
669+
/// Invoke a command and return the exit code, stdout, and stderr.
670+
///
671+
/// # Arguments
672+
///
673+
/// * `executable` - The command to execute
674+
/// * `args` - Optional arguments to pass to the command
675+
/// * `input` - Optional input to pass to the command
676+
/// * `cwd` - Optional working directory to execute the command in
677+
/// * `env` - Optional environment variable mappings to add or update
678+
/// * `exit_codes` - Optional descriptions of exit codes
679+
///
680+
/// # Errors
681+
///
682+
/// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened.
683+
///
684+
/// # Panics
685+
///
686+
/// Will panic if tokio runtime can't be created.
687+
///
688+
#[allow(clippy::implicit_hasher)]
689+
pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
690+
debug!("Invoking command '{}' with args {:?}", executable, args);
640691

641-
Ok((exit_code, stdout, cleaned_stderr))
692+
tokio::runtime::Builder::new_multi_thread()
693+
.enable_all()
694+
.build()
695+
.unwrap()
696+
.block_on(run_process_async(executable, args, input, cwd, env, exit_codes))
642697
}
643698

644699
fn process_args(args: &Option<Vec<ArgKind>>, value: &str) -> Option<Vec<String>> {
@@ -784,30 +839,31 @@ fn json_to_hashmap(json: &str) -> Result<HashMap<String, String>, DscError> {
784839
///
785840
/// * `process_name` - The name of the process
786841
/// * `process_id` - The ID of the process
787-
/// * `stderr` - The stderr output from the process
788-
pub fn log_resource_traces(process_name: &str, process_id: &u32, stderr: &str)
842+
/// * `trace_line` - The stderr line from the process
843+
pub fn log_stderr_line<'a>(process_id: &u32, trace_line: &'a str) -> &'a str
789844
{
790-
if !stderr.is_empty()
845+
if !trace_line.is_empty()
791846
{
792-
for trace_line in stderr.lines() {
793-
if let Result::Ok(json_obj) = serde_json::from_str::<Value>(trace_line) {
794-
if let Some(msg) = json_obj.get("Error") {
795-
error!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
796-
} else if let Some(msg) = json_obj.get("Warning") {
797-
warn!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
798-
} else if let Some(msg) = json_obj.get("Info") {
799-
info!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
800-
} else if let Some(msg) = json_obj.get("Debug") {
801-
debug!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
802-
} else if let Some(msg) = json_obj.get("Trace") {
803-
trace!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
804-
} else {
805-
// TODO: deserialize tracing JSON to have better presentation
806-
trace!("Process '{process_name}' id {process_id} : {trace_line}");
807-
};
847+
if let Result::Ok(json_obj) = serde_json::from_str::<Value>(trace_line) {
848+
if let Some(msg) = json_obj.get("Error") {
849+
error!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
850+
} else if let Some(msg) = json_obj.get("Warning") {
851+
warn!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
852+
} else if let Some(msg) = json_obj.get("Info") {
853+
info!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
854+
} else if let Some(msg) = json_obj.get("Debug") {
855+
debug!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
856+
} else if let Some(msg) = json_obj.get("Trace") {
857+
trace!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
808858
} else {
809-
trace!("Process '{process_name}' id {process_id} : {trace_line}");
810-
}
859+
// the line is a valid json, but not one of standard trace lines - return it as filtered stderr_line
860+
return trace_line;
861+
};
862+
} else {
863+
// the line is not a valid json - return it as filtered stderr_line
864+
return trace_line;
811865
}
812-
}
866+
};
867+
868+
""
813869
}

powershell-adapter/Tests/TestClassResource/0.0.1/TestClassResource.psm1

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ class TestClassResource : BaseTestClass
5959
static [TestClassResource[]] Export()
6060
{
6161
$resultList = [List[TestClassResource]]::new()
62-
1..5 | %{
62+
$resultCount = 5
63+
if ($env:TestClassResourceResultCount) {
64+
$resultCount = $env:TestClassResourceResultCount
65+
}
66+
1..$resultCount | %{
6367
$obj = New-Object TestClassResource
6468
$obj.Name = "Object$_"
6569
$obj.Prop1 = "Property of object$_"

powershell-adapter/Tests/powershellgroup.resource.tests.ps1

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,15 @@ Describe 'PowerShell adapter resource tests' {
241241
$env:PATH = $oldPath
242242
}
243243
}
244+
245+
It 'Dsc can process large resource output' -Tag z1{
246+
$env:TestClassResourceResultCount = 5000 # with sync resource invocations this was not possible
247+
248+
$r = dsc resource export -r TestClassResource/TestClassResource
249+
$LASTEXITCODE | Should -Be 0
250+
$res = $r | ConvertFrom-Json
251+
$res.resources[0].properties.result.count | Should -Be 5000
252+
253+
$env:TestClassResourceResultCount = $null
254+
}
244255
}

0 commit comments

Comments
 (0)