Skip to content

[KratosBio] - Adding Windkessel feature #13448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import KratosMultiphysics

import KratosMultiphysics.FluidDynamicsApplication as KratosFluid

# Import applications
import KratosMultiphysics.FluidDynamicsBiomedicalApplication as KratosBio


def Factory(settings, Model):
if(type(settings) != KratosMultiphysics.Parameters):
raise Exception("expected input shall be a Parameters object, encapsulating a json string")
return ApplyWindkesselOutletProcess(Model, settings["Parameters"])


class ApplyWindkesselOutletProcess(KratosMultiphysics.Process):
def __init__(self, Model, settings ):

KratosMultiphysics.Process.__init__(self)

default_settings = KratosMultiphysics.Parameters("""
{
"model_part_name" : "",
"variable_name" : "PRESSURE",
"constrained" : true,
"value" : 0.0,
"interval" : [0.0,"End"],
"characteristic_resistance" : 0.0,
"peripheral_resistance" : 0.0,
"arterial_compliance" : 0.0,
"venous_pressure" : 0.0,
"initial_pressure" : 0.0,
"pressure_unit" : "mmHg",
"echo_level" : 0
}
""")

# Trick: allows "value" to be a double, a string or a table value (otherwise the ValidateAndAssignDefaults might fail)
if(settings.Has("value")):
if(settings["value"].IsString()):
default_settings["value"].SetString("0.0")
elif settings["value"].IsNumber():
default_settings["value"].SetDouble(0.0)
else:
err_msg = "Provided settings have no 'value'. This needs to be provided."
raise Exception(err_msg)

settings.ValidateAndAssignDefaults(settings)

# Check the core processes input data
if (settings["model_part_name"].GetString() == ""):
raise Exception("Empty outlet pressure model part name. Set a valid model part name.")
elif (settings["variable_name"].GetString() != "PRESSURE"):
raise Exception("Outlet pressure settings variable_name is not PRESSURE.")
elif (settings["value"].IsString()):
if (settings["value"].GetString == ""):
raise Exception("Outlet pressure function sting is empty.")
raise Exception("Outlet external pressure function sting is empty.")

self.R1 = settings["characteristic_resistance"].GetDouble()
self.R2 = settings["peripheral_resistance"].GetDouble()
self.C = settings["arterial_compliance"].GetDouble()
p0_mmHg = settings["initial_pressure"].GetDouble()
pv_mmHg = settings["venous_pressure"].GetDouble()
self.echo = settings["echo_level"].GetInt()
self.pressure_unit = settings["pressure_unit"].GetString()

self.conv = 13.545*9.81 # Pressure conversion factor. It is used if pressure is provided in mmHg
if self.pressure_unit == "mmHg" :
self.pv = pv_mmHg*self.conv
p0 = p0_mmHg*self.conv
elif self.pressure_unit == "Pa" :
self.pv = pv_mmHg
p0 = p0_mmHg # The pressure variable passed from the json file is given in Pa
else :
raise Exception("Pressure unit measyre can be given in mmHg or in Pa")

self.previous_q1 = 0.0
self.current_p1 = p0 # in Pa

# Set the OUTLET flag in the outlet model part nodes and conditions
self.outlet_model_part = Model[settings["model_part_name"].GetString()]
for node in self.outlet_model_part.Nodes:
node.Set(KratosMultiphysics.OUTLET, True)
for condition in self.outlet_model_part.Conditions:
condition.Set(KratosMultiphysics.OUTLET, True)
Comment on lines +82 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the SetFlag method from the VariableUtils (it is exported to Python) so these two loops are done in parallel.


def ExecuteInitializeSolutionStep(self):

# Here the value to be provided to the outlet pressure is computed as the result of an ODE:
delta_t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.DELTA_TIME]
t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.TIME]

self.current_q1 = KratosFluid.FluidAuxiliaryUtilities.CalculateFlowRate(self.outlet_model_part)

self.modified_p1 = (1/self.C*(self.current_q1*( 1 + self.R1/self.R2) + self.R1*self.C*(self.current_q1 - self.previous_q1)/delta_t - (self.current_p1 - self.pv)/self.R2))*delta_t + self.current_p1

if self.echo > 0:
KratosMultiphysics.Logger.PrintInfo("Windkessel", f"Current flow rate: {self.current_q1}")
if self.pressure_unit == "mmHg" :
KratosMultiphysics.Logger.PrintInfo("Windkessel", f"Outlet new pressure: {self.modified_p1/self.conv} mmHg")
elif self.pressure_unit == "Pa" :
KratosMultiphysics.Logger.PrintInfo("Windkessel", f"Outlet new pressure: {self.modified_p1} Pa")
else :
raise Exception("Pressure unit measure can be given in mmHg or in Pa")

for node in self.outlet_model_part.Nodes:
# Setting new solution on the nodes
node.Fix(KratosMultiphysics.PRESSURE)
node.SetSolutionStepValue(KratosMultiphysics.PRESSURE,self.modified_p1)
node.SetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE,self.modified_p1)

def ExecuteFinalizeSolutionStep(self):

self.previous_q1 = self.current_q1
self.current_p1 = self.modified_p1


# Private methods section
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import KratosMultiphysics
import KratosMultiphysics.KratosUnittest as KratosUnittest
import KratosMultiphysics.kratos_utilities as KratosUtilities
from KratosMultiphysics.FluidDynamicsBiomedicalApplication import apply_windkessel_outlet_process

class ApplyWindkesselOutletProcessTest(KratosUnittest.TestCase):
@classmethod
def __CreateModel(self):
model = KratosMultiphysics.Model()
outlet_model_part = model.CreateModelPart("OutletModelPart")
properties = outlet_model_part.CreateNewProperties(0)
outlet_model_part.ProcessInfo[KratosMultiphysics.DOMAIN_SIZE] = 2
outlet_model_part.ProcessInfo.SetValue(KratosMultiphysics.DELTA_TIME, 0.01)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.NORMAL)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.VELOCITY)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.PRESSURE)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.EXTERNAL_PRESSURE)
outlet_model_part.CreateNewNode(1, 0.0, 1.0, 0.0)
outlet_model_part.CreateNewNode(2, 2.0, 0.0, 0.0)
outlet_model_part.CreateNewCondition("LineCondition2D2N", 1, [1, 2], properties)
for node in outlet_model_part.Nodes:
node.AddDof(KratosMultiphysics.PRESSURE)
node.AddDof(KratosMultiphysics.VELOCITY_X)
node.AddDof(KratosMultiphysics.VELOCITY_Y)
node.AddDof(KratosMultiphysics.VELOCITY_Z)
node.SetSolutionStepValue(KratosMultiphysics.VELOCITY,[0.0,0.0,0.0])
return model

def testApplyWindkesselOutletProcessConversionFalse(self):
# Set up test - Check the behaviour of Windkessel update when values are written in Pa
model = self.__CreateModel()
settings = KratosMultiphysics.Parameters("""{
"Parameters" : {
"model_part_name" : "OutletModelPart",
"variable_name" : "PRESSURE",
"constrained" : false,
"value" : 0.0,
"interval" : [0.0,"End"],
"characteristic_resistance" : 0.02,
"peripheral_resistance" : 0.98,
"arterial_compliance" : 1.02,
"venous_pressure" : 0.0,
"initial_pressure" : 1.0,
"pressure_unit" : "Pa",
"echo_level" : 1
}
}""")


# Create the outlet process
outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"])

# Execute and check the outlet process
self.__ExecuteAndCheckOutletProcessFalse(model, outlet_process)

def testApplyWindkesselOutletProcessConversionTrue(self):
# Set up test - Check the behaviour of Windkessel update when values are written in mmHg and converted in Pa
model = self.__CreateModel()
settings = KratosMultiphysics.Parameters("""{
"Parameters" : {
"model_part_name" : "OutletModelPart",
"variable_name" : "PRESSURE",
"constrained" : false,
"value" : 0.0,
"interval" : [0.0,"End"],
"characteristic_resistance" : 0.02,
"peripheral_resistance" : 0.98,
"arterial_compliance" : 1.02,
"venous_pressure" : 0.0,
"initial_pressure" : 1.0,
"pressure_unit" : "mmHg",
"echo_level" : 1
}
}""")


# Create the outlet process
outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"])

# Execute and check the outlet process
self.__ExecuteAndCheckOutletProcessTrue(model, outlet_process)

def __ExecuteAndCheckOutletProcessFalse(self, model, outlet_process):
# Execute and check ExecuteInitializeSolutionStep
outlet_process.ExecuteInitializeSolutionStep()
outlet_process.ExecuteFinalizeSolutionStep()
for node in model.GetModelPart("OutletModelPart").Nodes:
self.assertTrue(node.Is(KratosMultiphysics.OUTLET))
self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE))
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 0.99,5)
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 0.99,5)
for condition in model.GetModelPart("OutletModelPart").Conditions:
self.assertTrue(condition.Is(KratosMultiphysics.OUTLET))

def __ExecuteAndCheckOutletProcessTrue(self, model, outlet_process):
# Execute and check ExecuteInitializeSolutionStep
outlet_process.ExecuteInitializeSolutionStep()
outlet_process.ExecuteFinalizeSolutionStep()
for node in model.GetModelPart("OutletModelPart").Nodes:
self.assertTrue(node.Is(KratosMultiphysics.OUTLET))
self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE))
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 131.5472,1)
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 131.5472,1)
for condition in model.GetModelPart("OutletModelPart").Conditions:
self.assertTrue(condition.Is(KratosMultiphysics.OUTLET))

if __name__ == '__main__':
KratosUnittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Import the tests o test_classes to create the suites
from apply_parabolic_inlet_process_test import ApplyParabolicInletProcessTest
from apply_windkessel_outlet_process_test import ApplyWindkesselOutletProcessTest

def AssembleTestSuites():
''' Populates the test suites to run.
Expand All @@ -29,6 +30,7 @@ def AssembleTestSuites():
# Create a test suite with the selected tests (Small tests):
smallSuite = suites['small']
smallSuite.addTests(KratosUnittest.TestLoader().loadTestsFromTestCases([ApplyParabolicInletProcessTest]))
smallSuite.addTests(KratosUnittest.TestLoader().loadTestsFromTestCases([ApplyWindkesselOutletProcessTest]))

# Create a test suite with the selected tests plus all small tests
nightSuite = suites['nightly']
Expand Down
Loading