-
Notifications
You must be signed in to change notification settings - Fork 261
[KratosBio] - Adding Windkessel feature #13448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,138 @@ | ||||||||||
import math | ||||||||||
import KratosMultiphysics | ||||||||||
from KratosMultiphysics.assign_scalar_variable_process import AssignScalarVariableProcess | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think you don't need this. |
||||||||||
|
||||||||||
import KratosMultiphysics.FluidDynamicsApplication as KratosFluid | ||||||||||
|
||||||||||
# Import applications | ||||||||||
import KratosMultiphysics.FluidDynamicsBiomedicalApplication as KratosBio | ||||||||||
|
||||||||||
|
||||||||||
def Factory(settings, Model): | ||||||||||
if(type(settings) != KratosMultiphysics.Parameters): | ||||||||||
raise Exception("expected input shall be a Parameters object, encapsulating a json string") | ||||||||||
return ApplyWindkesselOutletProcess(Model, settings["Parameters"]) | ||||||||||
|
||||||||||
|
||||||||||
class ApplyWindkesselOutletProcess(KratosMultiphysics.Process): | ||||||||||
def __init__(self, Model, settings ): | ||||||||||
|
||||||||||
KratosMultiphysics.Process.__init__(self) | ||||||||||
|
||||||||||
default_settings = KratosMultiphysics.Parameters(""" | ||||||||||
{ | ||||||||||
"mesh_id" : 0, | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This is no longer needed. |
||||||||||
"model_part_name" : "", | ||||||||||
"variable_name" : "PRESSURE", | ||||||||||
"constrained" : true, | ||||||||||
"value" : 0.0, | ||||||||||
"interval" : [0.0,"End"], | ||||||||||
"resistance1" : 0.0, | ||||||||||
"resistance2" : 0.0, | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
To follow our standard. Besides, if these are not customary ones in de literature (IDK), I'd try to use a more descriptive name for these two values to ease the use. |
||||||||||
"compliance" : 0.0, | ||||||||||
"venous_pressure" : 0.0, | ||||||||||
"initial_pressure" : 0.0, | ||||||||||
"pressure_in_mmHg" : true, | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead, I'd suggest a |
||||||||||
"echo_level" : 0 | ||||||||||
} | ||||||||||
""") | ||||||||||
|
||||||||||
# Trick: allows "value" to be a double, a string or a table value (otherwise the ValidateAndAssignDefaults might fail) | ||||||||||
if(settings.Has("value")): | ||||||||||
if(settings["value"].IsString()): | ||||||||||
default_settings["value"].SetString("0.0") | ||||||||||
elif settings["value"].IsNumber(): | ||||||||||
default_settings["value"].SetDouble(0.0) | ||||||||||
else: | ||||||||||
err_msg = "Provided settings have no 'value'. This needs to be provided." | ||||||||||
raise Exception(err_msg) | ||||||||||
|
||||||||||
settings.ValidateAndAssignDefaults(default_settings) | ||||||||||
|
||||||||||
# Set a Kratos parameters suitable for the core processes to set the PRESSURE | ||||||||||
pres_settings = settings.Clone() | ||||||||||
pres_settings.RemoveValue("resistance1") | ||||||||||
pres_settings.RemoveValue("resistance2") | ||||||||||
pres_settings.RemoveValue("compliance") | ||||||||||
pres_settings.RemoveValue("initial_pressure") | ||||||||||
pres_settings.RemoveValue("venous_pressure") | ||||||||||
pres_settings.RemoveValue("echo_level") | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which is the purpose of |
||||||||||
|
||||||||||
# Create a copy of the PRESSURE settings to set the EXTERNAL_PRESSURE | ||||||||||
ext_pres_settings = pres_settings.Clone() | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which is the purpose of |
||||||||||
ext_pres_settings["constrained"].SetBool(False) | ||||||||||
ext_pres_settings["variable_name"].SetString("EXTERNAL_PRESSURE") | ||||||||||
|
||||||||||
# Check the core processes input data | ||||||||||
if (pres_settings["model_part_name"].GetString() == ""): | ||||||||||
raise Exception("Empty outlet pressure model part name. Set a valid model part name.") | ||||||||||
elif (ext_pres_settings["model_part_name"].GetString() == ""): | ||||||||||
raise Exception("Empty outlet external pressure model part name. Set a valid model part name.") | ||||||||||
elif (pres_settings["variable_name"].GetString() != "PRESSURE"): | ||||||||||
raise Exception("Outlet pressure settings variable_name is not PRESSURE.") | ||||||||||
elif (ext_pres_settings["variable_name"].GetString() != "EXTERNAL_PRESSURE"): | ||||||||||
raise Exception("Outlet external pressure settings variable_name is not EXTERNAL_PRESSURE.") | ||||||||||
elif (pres_settings["value"].IsString()): | ||||||||||
if (pres_settings["value"].GetString == ""): | ||||||||||
raise Exception("Outlet pressure function sting is empty.") | ||||||||||
elif (ext_pres_settings["value"].IsString()): | ||||||||||
if (ext_pres_settings["value"].GetString == ""): | ||||||||||
raise Exception("Outlet external pressure function sting is empty.") | ||||||||||
|
||||||||||
self.R1 = settings["resistance1"].GetDouble() | ||||||||||
self.R2 = settings["resistance2"].GetDouble() | ||||||||||
self.C = settings["compliance"].GetDouble() | ||||||||||
p0_mmHg = settings["initial_pressure"].GetDouble() | ||||||||||
pv_mmHg = settings["venous_pressure"].GetDouble() | ||||||||||
self.echo = settings["echo_level"].GetInt() | ||||||||||
self.pressure_in_mmHg = settings["pressure_in_mmHg"].GetBool() | ||||||||||
|
||||||||||
self.conv = 13.545*9.81 # Pressure conversion factor. It is used if pressure is provided in mmHg | ||||||||||
if self.pressure_in_mmHg is True: | ||||||||||
self.pv = pv_mmHg*self.conv | ||||||||||
p0 = p0_mmHg*self.conv | ||||||||||
else: | ||||||||||
self.pv = pv_mmHg | ||||||||||
p0 = p0_mmHg # The pressure variable passed from the json file is given in Pa | ||||||||||
|
||||||||||
|
||||||||||
self.previous_q1 = 0.0 | ||||||||||
self.current_p1 = p0 # in Pa | ||||||||||
|
||||||||||
# Set the OUTLET flag in the outlet model part nodes and conditions | ||||||||||
self.outlet_model_part = Model[pres_settings["model_part_name"].GetString()] | ||||||||||
for node in self.outlet_model_part.Nodes: | ||||||||||
node.Set(KratosMultiphysics.OUTLET, True) | ||||||||||
for condition in self.outlet_model_part.Conditions: | ||||||||||
condition.Set(KratosMultiphysics.OUTLET, True) | ||||||||||
Comment on lines
+82
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use the |
||||||||||
|
||||||||||
def ExecuteInitializeSolutionStep(self): | ||||||||||
|
||||||||||
# Here the value to be provided to the outlet pressure is computed as the result of an ODE: | ||||||||||
delta_t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.DELTA_TIME] | ||||||||||
t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.TIME] | ||||||||||
|
||||||||||
self.current_q1 = KratosFluid.FluidAuxiliaryUtilities.CalculateFlowRate(self.outlet_model_part) | ||||||||||
|
||||||||||
self.modified_p1 = (1/self.C*(self.current_q1*( 1 + self.R1/self.R2) + self.R1*self.C*(self.current_q1 - self.previous_q1)/delta_t - (self.current_p1 - self.pv)/self.R2))*delta_t + self.current_p1 | ||||||||||
|
||||||||||
if self.echo > 0: | ||||||||||
print('Current flow rate', self.current_q1) | ||||||||||
if self.pressure_in_mmHg: | ||||||||||
print('Outlet new pressure:', self.modified_p1/self.conv, " mmHg") | ||||||||||
else: | ||||||||||
print('Outlet new pressure:', self.modified_p1, " Pa") | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the |
||||||||||
|
||||||||||
for node in self.outlet_model_part.Nodes: | ||||||||||
# Setting new solution on the nodes | ||||||||||
node.Fix(KratosMultiphysics.PRESSURE) | ||||||||||
node.SetSolutionStepValue(KratosMultiphysics.PRESSURE,self.modified_p1) | ||||||||||
node.SetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE,self.modified_p1) | ||||||||||
|
||||||||||
def ExecuteFinalizeSolutionStep(self): | ||||||||||
|
||||||||||
self.previous_q1 = self.current_q1 | ||||||||||
self.current_p1 = self.modified_p1 | ||||||||||
|
||||||||||
|
||||||||||
# Private methods section |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,126 @@ | ||||
import csv | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think you don't need this. |
||||
|
||||
import KratosMultiphysics | ||||
import KratosMultiphysics.KratosUnittest as KratosUnittest | ||||
import KratosMultiphysics.kratos_utilities as KratosUtilities | ||||
from KratosMultiphysics.FluidDynamicsApplication import apply_outlet_process | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think you don't need this. |
||||
from KratosMultiphysics.FluidDynamicsBiomedicalApplication import apply_windkessel_outlet_process | ||||
|
||||
class ApplyWindkesselOutletProcessTest(KratosUnittest.TestCase): | ||||
@classmethod | ||||
def __CreateModel(self): | ||||
model = KratosMultiphysics.Model() | ||||
outlet_model_part = model.CreateModelPart("OutletModelPart") | ||||
properties = outlet_model_part.CreateNewProperties(0) | ||||
outlet_model_part.ProcessInfo[KratosMultiphysics.DOMAIN_SIZE] = 2 | ||||
outlet_model_part.ProcessInfo.SetValue(KratosMultiphysics.DELTA_TIME, 0.01) | ||||
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.NORMAL) | ||||
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.VELOCITY) | ||||
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.PRESSURE) | ||||
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.EXTERNAL_PRESSURE) | ||||
outlet_model_part.CreateNewNode(1, 0.0, 1.0, 0.0) | ||||
outlet_model_part.CreateNewNode(2, 2.0, 0.0, 0.0) | ||||
outlet_model_part.CreateNewCondition("LineCondition2D2N", 1, [1, 2], properties) | ||||
for node in outlet_model_part.Nodes: | ||||
node.AddDof(KratosMultiphysics.PRESSURE) | ||||
node.AddDof(KratosMultiphysics.VELOCITY_X) | ||||
node.AddDof(KratosMultiphysics.VELOCITY_Y) | ||||
node.AddDof(KratosMultiphysics.VELOCITY_Z) | ||||
node.SetSolutionStepValue(KratosMultiphysics.VELOCITY,[0.0,0.0,0.0]) | ||||
return model | ||||
|
||||
@classmethod | ||||
def __GetBlankParameters(cls): | ||||
return KratosMultiphysics.Parameters("""{ | ||||
"name" : "Processes.KratosMultiphysics.FluidDynamicsBiomedicalApplication.apply_windkessel_outlet_process", | ||||
"Parameters" : { | ||||
"model_part_name" : "OutletModelPart" | ||||
} | ||||
}""") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which is the purpose of these? |
||||
|
||||
@classmethod | ||||
def tearDown(self): | ||||
KratosUtilities.DeleteFileIfExisting("test_outlet_table.csv") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is doing nothing. |
||||
|
||||
def testApplyWindkesselOutletProcessConversionFalse(self): | ||||
# Set up test - Check the behaviour of Windkessel update when values are written in Pa | ||||
model = self.__CreateModel() | ||||
settings = KratosMultiphysics.Parameters("""{ | ||||
"Parameters" : { | ||||
"model_part_name" : "OutletModelPart", | ||||
"mesh_id" : 0, | ||||
"variable_name" : "PRESSURE", | ||||
"constrained" : false, | ||||
"value" : 0.0, | ||||
"interval" : [0.0,"End"], | ||||
"resistance1" : 0.02, | ||||
"resistance2" : 0.98, | ||||
"compliance" : 1.02, | ||||
"venous_pressure" : 0.0, | ||||
"initial_pressure" : 1.0, | ||||
"pressure_in_mmHg" : false, | ||||
"echo_level" : 1 | ||||
} | ||||
}""") | ||||
|
||||
|
||||
# Create the outlet process | ||||
outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"]) | ||||
|
||||
# Execute and check the outlet process | ||||
self.__ExecuteAndCheckOutletProcessFalse(model, outlet_process) | ||||
|
||||
def testApplyWindkesselOutletProcessConversionTrue(self): | ||||
# Set up test - Check the behaviour of Windkessel update when values are written in mmHg and converted in Pa | ||||
model = self.__CreateModel() | ||||
settings = KratosMultiphysics.Parameters("""{ | ||||
"Parameters" : { | ||||
"model_part_name" : "OutletModelPart", | ||||
"mesh_id" : 0, | ||||
"variable_name" : "PRESSURE", | ||||
"constrained" : false, | ||||
"value" : 0.0, | ||||
"interval" : [0.0,"End"], | ||||
"resistance1" : 0.02, | ||||
"resistance2" : 0.98, | ||||
"compliance" : 1.02, | ||||
"venous_pressure" : 0.0, | ||||
"initial_pressure" : 1.0, | ||||
"pressure_in_mmHg" : true, | ||||
"echo_level" : 1 | ||||
} | ||||
}""") | ||||
|
||||
|
||||
# Create the outlet process | ||||
outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"]) | ||||
|
||||
# Execute and check the outlet process | ||||
self.__ExecuteAndCheckOutletProcessTrue(model, outlet_process) | ||||
|
||||
def __ExecuteAndCheckOutletProcessFalse(self, model, outlet_process): | ||||
# Execute and check ExecuteInitializeSolutionStep | ||||
outlet_process.ExecuteInitializeSolutionStep() | ||||
outlet_process.ExecuteFinalizeSolutionStep() | ||||
for node in model.GetModelPart("OutletModelPart").Nodes: | ||||
self.assertTrue(node.Is(KratosMultiphysics.OUTLET)) | ||||
self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE)) | ||||
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 0.99,5) | ||||
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 0.99,5) | ||||
for condition in model.GetModelPart("OutletModelPart").Conditions: | ||||
self.assertTrue(condition.Is(KratosMultiphysics.OUTLET)) | ||||
|
||||
def __ExecuteAndCheckOutletProcessTrue(self, model, outlet_process): | ||||
# Execute and check ExecuteInitializeSolutionStep | ||||
outlet_process.ExecuteInitializeSolutionStep() | ||||
outlet_process.ExecuteFinalizeSolutionStep() | ||||
for node in model.GetModelPart("OutletModelPart").Nodes: | ||||
self.assertTrue(node.Is(KratosMultiphysics.OUTLET)) | ||||
self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE)) | ||||
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 131.5472,1) | ||||
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 131.5472,1) | ||||
for condition in model.GetModelPart("OutletModelPart").Conditions: | ||||
self.assertTrue(condition.Is(KratosMultiphysics.OUTLET)) | ||||
|
||||
if __name__ == '__main__': | ||||
KratosUnittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you don't need this.