diff --git a/applications/FluidDynamicsBiomedicalApplication/python_scripts/apply_windkessel_outlet_process.py b/applications/FluidDynamicsBiomedicalApplication/python_scripts/apply_windkessel_outlet_process.py new file mode 100644 index 000000000000..b34d0b3adf1d --- /dev/null +++ b/applications/FluidDynamicsBiomedicalApplication/python_scripts/apply_windkessel_outlet_process.py @@ -0,0 +1,118 @@ +import KratosMultiphysics + +import KratosMultiphysics.FluidDynamicsApplication as KratosFluid + +# Import applications +import KratosMultiphysics.FluidDynamicsBiomedicalApplication as KratosBio + + +def Factory(settings, Model): + if(type(settings) != KratosMultiphysics.Parameters): + raise Exception("expected input shall be a Parameters object, encapsulating a json string") + return ApplyWindkesselOutletProcess(Model, settings["Parameters"]) + + +class ApplyWindkesselOutletProcess(KratosMultiphysics.Process): + def __init__(self, Model, settings ): + + KratosMultiphysics.Process.__init__(self) + + default_settings = KratosMultiphysics.Parameters(""" + { + "model_part_name" : "", + "variable_name" : "PRESSURE", + "constrained" : true, + "value" : 0.0, + "interval" : [0.0,"End"], + "characteristic_resistance" : 0.0, + "peripheral_resistance" : 0.0, + "arterial_compliance" : 0.0, + "venous_pressure" : 0.0, + "initial_pressure" : 0.0, + "pressure_unit" : "mmHg", + "echo_level" : 0 + } + """) + + # Trick: allows "value" to be a double, a string or a table value (otherwise the ValidateAndAssignDefaults might fail) + if(settings.Has("value")): + if(settings["value"].IsString()): + default_settings["value"].SetString("0.0") + elif settings["value"].IsNumber(): + default_settings["value"].SetDouble(0.0) + else: + err_msg = "Provided settings have no 'value'. This needs to be provided." + raise Exception(err_msg) + + settings.ValidateAndAssignDefaults(settings) + + # Check the core processes input data + if (settings["model_part_name"].GetString() == ""): + raise Exception("Empty outlet pressure model part name. Set a valid model part name.") + elif (settings["variable_name"].GetString() != "PRESSURE"): + raise Exception("Outlet pressure settings variable_name is not PRESSURE.") + elif (settings["value"].IsString()): + if (settings["value"].GetString == ""): + raise Exception("Outlet pressure function sting is empty.") + raise Exception("Outlet external pressure function sting is empty.") + + self.R1 = settings["characteristic_resistance"].GetDouble() + self.R2 = settings["peripheral_resistance"].GetDouble() + self.C = settings["arterial_compliance"].GetDouble() + p0_mmHg = settings["initial_pressure"].GetDouble() + pv_mmHg = settings["venous_pressure"].GetDouble() + self.echo = settings["echo_level"].GetInt() + self.pressure_unit = settings["pressure_unit"].GetString() + + self.conv = 13.545*9.81 # Pressure conversion factor. It is used if pressure is provided in mmHg + if self.pressure_unit == "mmHg" : + self.pv = pv_mmHg*self.conv + p0 = p0_mmHg*self.conv + elif self.pressure_unit == "Pa" : + self.pv = pv_mmHg + p0 = p0_mmHg # The pressure variable passed from the json file is given in Pa + else : + raise Exception("Pressure unit measyre can be given in mmHg or in Pa") + + self.previous_q1 = 0.0 + self.current_p1 = p0 # in Pa + + # Set the OUTLET flag in the outlet model part nodes and conditions + self.outlet_model_part = Model[settings["model_part_name"].GetString()] + for node in self.outlet_model_part.Nodes: + node.Set(KratosMultiphysics.OUTLET, True) + for condition in self.outlet_model_part.Conditions: + condition.Set(KratosMultiphysics.OUTLET, True) + + def ExecuteInitializeSolutionStep(self): + + # Here the value to be provided to the outlet pressure is computed as the result of an ODE: + delta_t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.DELTA_TIME] + t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.TIME] + + self.current_q1 = KratosFluid.FluidAuxiliaryUtilities.CalculateFlowRate(self.outlet_model_part) + + self.modified_p1 = (1/self.C*(self.current_q1*( 1 + self.R1/self.R2) + self.R1*self.C*(self.current_q1 - self.previous_q1)/delta_t - (self.current_p1 - self.pv)/self.R2))*delta_t + self.current_p1 + + if self.echo > 0: + KratosMultiphysics.Logger.PrintInfo("Windkessel", f"Current flow rate: {self.current_q1}") + if self.pressure_unit == "mmHg" : + KratosMultiphysics.Logger.PrintInfo("Windkessel", f"Outlet new pressure: {self.modified_p1/self.conv} mmHg") + elif self.pressure_unit == "Pa" : + KratosMultiphysics.Logger.PrintInfo("Windkessel", f"Outlet new pressure: {self.modified_p1} Pa") + else : + raise Exception("Pressure unit measure can be given in mmHg or in Pa") + + for node in self.outlet_model_part.Nodes: + # Setting new solution on the nodes + node.Fix(KratosMultiphysics.PRESSURE) + node.SetSolutionStepValue(KratosMultiphysics.PRESSURE,self.modified_p1) + node.SetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE,self.modified_p1) + + def ExecuteFinalizeSolutionStep(self): + + self.previous_q1 = self.current_q1 + self.current_p1 = self.modified_p1 + + + # Private methods section diff --git a/applications/FluidDynamicsBiomedicalApplication/tests/apply_windkessel_outlet_process_test.py b/applications/FluidDynamicsBiomedicalApplication/tests/apply_windkessel_outlet_process_test.py new file mode 100644 index 000000000000..35f93e4cdd42 --- /dev/null +++ b/applications/FluidDynamicsBiomedicalApplication/tests/apply_windkessel_outlet_process_test.py @@ -0,0 +1,108 @@ +import KratosMultiphysics +import KratosMultiphysics.KratosUnittest as KratosUnittest +import KratosMultiphysics.kratos_utilities as KratosUtilities +from KratosMultiphysics.FluidDynamicsBiomedicalApplication import apply_windkessel_outlet_process + +class ApplyWindkesselOutletProcessTest(KratosUnittest.TestCase): + @classmethod + def __CreateModel(self): + model = KratosMultiphysics.Model() + outlet_model_part = model.CreateModelPart("OutletModelPart") + properties = outlet_model_part.CreateNewProperties(0) + outlet_model_part.ProcessInfo[KratosMultiphysics.DOMAIN_SIZE] = 2 + outlet_model_part.ProcessInfo.SetValue(KratosMultiphysics.DELTA_TIME, 0.01) + outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.NORMAL) + outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.VELOCITY) + outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.PRESSURE) + outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.EXTERNAL_PRESSURE) + outlet_model_part.CreateNewNode(1, 0.0, 1.0, 0.0) + outlet_model_part.CreateNewNode(2, 2.0, 0.0, 0.0) + outlet_model_part.CreateNewCondition("LineCondition2D2N", 1, [1, 2], properties) + for node in outlet_model_part.Nodes: + node.AddDof(KratosMultiphysics.PRESSURE) + node.AddDof(KratosMultiphysics.VELOCITY_X) + node.AddDof(KratosMultiphysics.VELOCITY_Y) + node.AddDof(KratosMultiphysics.VELOCITY_Z) + node.SetSolutionStepValue(KratosMultiphysics.VELOCITY,[0.0,0.0,0.0]) + return model + + def testApplyWindkesselOutletProcessConversionFalse(self): + # Set up test - Check the behaviour of Windkessel update when values are written in Pa + model = self.__CreateModel() + settings = KratosMultiphysics.Parameters("""{ + "Parameters" : { + "model_part_name" : "OutletModelPart", + "variable_name" : "PRESSURE", + "constrained" : false, + "value" : 0.0, + "interval" : [0.0,"End"], + "characteristic_resistance" : 0.02, + "peripheral_resistance" : 0.98, + "arterial_compliance" : 1.02, + "venous_pressure" : 0.0, + "initial_pressure" : 1.0, + "pressure_unit" : "Pa", + "echo_level" : 1 + } + }""") + + + # Create the outlet process + outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"]) + + # Execute and check the outlet process + self.__ExecuteAndCheckOutletProcessFalse(model, outlet_process) + + def testApplyWindkesselOutletProcessConversionTrue(self): + # Set up test - Check the behaviour of Windkessel update when values are written in mmHg and converted in Pa + model = self.__CreateModel() + settings = KratosMultiphysics.Parameters("""{ + "Parameters" : { + "model_part_name" : "OutletModelPart", + "variable_name" : "PRESSURE", + "constrained" : false, + "value" : 0.0, + "interval" : [0.0,"End"], + "characteristic_resistance" : 0.02, + "peripheral_resistance" : 0.98, + "arterial_compliance" : 1.02, + "venous_pressure" : 0.0, + "initial_pressure" : 1.0, + "pressure_unit" : "mmHg", + "echo_level" : 1 + } + }""") + + + # Create the outlet process + outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"]) + + # Execute and check the outlet process + self.__ExecuteAndCheckOutletProcessTrue(model, outlet_process) + + def __ExecuteAndCheckOutletProcessFalse(self, model, outlet_process): + # Execute and check ExecuteInitializeSolutionStep + outlet_process.ExecuteInitializeSolutionStep() + outlet_process.ExecuteFinalizeSolutionStep() + for node in model.GetModelPart("OutletModelPart").Nodes: + self.assertTrue(node.Is(KratosMultiphysics.OUTLET)) + self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE)) + self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 0.99,5) + self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 0.99,5) + for condition in model.GetModelPart("OutletModelPart").Conditions: + self.assertTrue(condition.Is(KratosMultiphysics.OUTLET)) + + def __ExecuteAndCheckOutletProcessTrue(self, model, outlet_process): + # Execute and check ExecuteInitializeSolutionStep + outlet_process.ExecuteInitializeSolutionStep() + outlet_process.ExecuteFinalizeSolutionStep() + for node in model.GetModelPart("OutletModelPart").Nodes: + self.assertTrue(node.Is(KratosMultiphysics.OUTLET)) + self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE)) + self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 131.5472,1) + self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 131.5472,1) + for condition in model.GetModelPart("OutletModelPart").Conditions: + self.assertTrue(condition.Is(KratosMultiphysics.OUTLET)) + +if __name__ == '__main__': + KratosUnittest.main() diff --git a/applications/FluidDynamicsBiomedicalApplication/tests/test_FluidDynamicsBiomedicalApplication.py b/applications/FluidDynamicsBiomedicalApplication/tests/test_FluidDynamicsBiomedicalApplication.py index 40488b1f5c51..104fdf29df64 100644 --- a/applications/FluidDynamicsBiomedicalApplication/tests/test_FluidDynamicsBiomedicalApplication.py +++ b/applications/FluidDynamicsBiomedicalApplication/tests/test_FluidDynamicsBiomedicalApplication.py @@ -11,6 +11,7 @@ # Import the tests o test_classes to create the suites from apply_parabolic_inlet_process_test import ApplyParabolicInletProcessTest +from apply_windkessel_outlet_process_test import ApplyWindkesselOutletProcessTest def AssembleTestSuites(): ''' Populates the test suites to run. @@ -29,6 +30,7 @@ def AssembleTestSuites(): # Create a test suite with the selected tests (Small tests): smallSuite = suites['small'] smallSuite.addTests(KratosUnittest.TestLoader().loadTestsFromTestCases([ApplyParabolicInletProcessTest])) + smallSuite.addTests(KratosUnittest.TestLoader().loadTestsFromTestCases([ApplyWindkesselOutletProcessTest])) # Create a test suite with the selected tests plus all small tests nightSuite = suites['nightly']