Skip to content

[KratosBio] - Adding Windkessel feature #13448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import math
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import math

I think you don't need this.

import KratosMultiphysics
from KratosMultiphysics.assign_scalar_variable_process import AssignScalarVariableProcess
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from KratosMultiphysics.assign_scalar_variable_process import AssignScalarVariableProcess

I think you don't need this.


import KratosMultiphysics.FluidDynamicsApplication as KratosFluid

# Import applications
import KratosMultiphysics.FluidDynamicsBiomedicalApplication as KratosBio


def Factory(settings, Model):
if(type(settings) != KratosMultiphysics.Parameters):
raise Exception("expected input shall be a Parameters object, encapsulating a json string")
return ApplyWindkesselOutletProcess(Model, settings["Parameters"])


class ApplyWindkesselOutletProcess(KratosMultiphysics.Process):
def __init__(self, Model, settings ):

KratosMultiphysics.Process.__init__(self)

default_settings = KratosMultiphysics.Parameters("""
{
"mesh_id" : 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"mesh_id" : 0,

This is no longer needed.

"model_part_name" : "",
"variable_name" : "PRESSURE",
"constrained" : true,
"value" : 0.0,
"interval" : [0.0,"End"],
"resistance1" : 0.0,
"resistance2" : 0.0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"resistance1" : 0.0,
"resistance2" : 0.0,
"resistance_1" : 0.0,
"resistance_2" : 0.0,

To follow our standard. Besides, if these are not customary ones in de literature (IDK), I'd try to use a more descriptive name for these two values to ease the use.

"compliance" : 0.0,
"venous_pressure" : 0.0,
"initial_pressure" : 0.0,
"pressure_in_mmHg" : true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, I'd suggest a pressure_unit field that can take string values as Pa and mmHg (otherwise an error is thrown). By doing so you leave the room open to extend it to other units if required in the future.

"echo_level" : 0
}
""")

# Trick: allows "value" to be a double, a string or a table value (otherwise the ValidateAndAssignDefaults might fail)
if(settings.Has("value")):
if(settings["value"].IsString()):
default_settings["value"].SetString("0.0")
elif settings["value"].IsNumber():
default_settings["value"].SetDouble(0.0)
else:
err_msg = "Provided settings have no 'value'. This needs to be provided."
raise Exception(err_msg)

settings.ValidateAndAssignDefaults(default_settings)

# Set a Kratos parameters suitable for the core processes to set the PRESSURE
pres_settings = settings.Clone()
pres_settings.RemoveValue("resistance1")
pres_settings.RemoveValue("resistance2")
pres_settings.RemoveValue("compliance")
pres_settings.RemoveValue("initial_pressure")
pres_settings.RemoveValue("venous_pressure")
pres_settings.RemoveValue("echo_level")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is the purpose of pres_settings? You're using it nowhere.


# Create a copy of the PRESSURE settings to set the EXTERNAL_PRESSURE
ext_pres_settings = pres_settings.Clone()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is the purpose of ext_pres_settings? You're using it nowhere.

ext_pres_settings["constrained"].SetBool(False)
ext_pres_settings["variable_name"].SetString("EXTERNAL_PRESSURE")

# Check the core processes input data
if (pres_settings["model_part_name"].GetString() == ""):
raise Exception("Empty outlet pressure model part name. Set a valid model part name.")
elif (ext_pres_settings["model_part_name"].GetString() == ""):
raise Exception("Empty outlet external pressure model part name. Set a valid model part name.")
elif (pres_settings["variable_name"].GetString() != "PRESSURE"):
raise Exception("Outlet pressure settings variable_name is not PRESSURE.")
elif (ext_pres_settings["variable_name"].GetString() != "EXTERNAL_PRESSURE"):
raise Exception("Outlet external pressure settings variable_name is not EXTERNAL_PRESSURE.")
elif (pres_settings["value"].IsString()):
if (pres_settings["value"].GetString == ""):
raise Exception("Outlet pressure function sting is empty.")
elif (ext_pres_settings["value"].IsString()):
if (ext_pres_settings["value"].GetString == ""):
raise Exception("Outlet external pressure function sting is empty.")

self.R1 = settings["resistance1"].GetDouble()
self.R2 = settings["resistance2"].GetDouble()
self.C = settings["compliance"].GetDouble()
p0_mmHg = settings["initial_pressure"].GetDouble()
pv_mmHg = settings["venous_pressure"].GetDouble()
self.echo = settings["echo_level"].GetInt()
self.pressure_in_mmHg = settings["pressure_in_mmHg"].GetBool()

self.conv = 13.545*9.81 # Pressure conversion factor. It is used if pressure is provided in mmHg
if self.pressure_in_mmHg is True:
self.pv = pv_mmHg*self.conv
p0 = p0_mmHg*self.conv
else:
self.pv = pv_mmHg
p0 = p0_mmHg # The pressure variable passed from the json file is given in Pa


self.previous_q1 = 0.0
self.current_p1 = p0 # in Pa

# Set the OUTLET flag in the outlet model part nodes and conditions
self.outlet_model_part = Model[pres_settings["model_part_name"].GetString()]
for node in self.outlet_model_part.Nodes:
node.Set(KratosMultiphysics.OUTLET, True)
for condition in self.outlet_model_part.Conditions:
condition.Set(KratosMultiphysics.OUTLET, True)
Comment on lines +82 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the SetFlag method from the VariableUtils (it is exported to Python) so these two loops are done in parallel.


def ExecuteInitializeSolutionStep(self):

# Here the value to be provided to the outlet pressure is computed as the result of an ODE:
delta_t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.DELTA_TIME]
t = self.outlet_model_part.ProcessInfo[KratosMultiphysics.TIME]

self.current_q1 = KratosFluid.FluidAuxiliaryUtilities.CalculateFlowRate(self.outlet_model_part)

self.modified_p1 = (1/self.C*(self.current_q1*( 1 + self.R1/self.R2) + self.R1*self.C*(self.current_q1 - self.previous_q1)/delta_t - (self.current_p1 - self.pv)/self.R2))*delta_t + self.current_p1

if self.echo > 0:
print('Current flow rate', self.current_q1)
if self.pressure_in_mmHg:
print('Outlet new pressure:', self.modified_p1/self.conv, " mmHg")
else:
print('Outlet new pressure:', self.modified_p1, " Pa")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the Logger class for this. Browse for KratosMultiphysics.Logger.PrintInfo usages to see many examples.


for node in self.outlet_model_part.Nodes:
# Setting new solution on the nodes
node.Fix(KratosMultiphysics.PRESSURE)
node.SetSolutionStepValue(KratosMultiphysics.PRESSURE,self.modified_p1)
node.SetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE,self.modified_p1)

def ExecuteFinalizeSolutionStep(self):

self.previous_q1 = self.current_q1
self.current_p1 = self.modified_p1


# Private methods section
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import csv
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import csv

I think you don't need this.


import KratosMultiphysics
import KratosMultiphysics.KratosUnittest as KratosUnittest
import KratosMultiphysics.kratos_utilities as KratosUtilities
from KratosMultiphysics.FluidDynamicsApplication import apply_outlet_process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from KratosMultiphysics.FluidDynamicsApplication import apply_outlet_process

I think you don't need this.

from KratosMultiphysics.FluidDynamicsBiomedicalApplication import apply_windkessel_outlet_process

class ApplyWindkesselOutletProcessTest(KratosUnittest.TestCase):
@classmethod
def __CreateModel(self):
model = KratosMultiphysics.Model()
outlet_model_part = model.CreateModelPart("OutletModelPart")
properties = outlet_model_part.CreateNewProperties(0)
outlet_model_part.ProcessInfo[KratosMultiphysics.DOMAIN_SIZE] = 2
outlet_model_part.ProcessInfo.SetValue(KratosMultiphysics.DELTA_TIME, 0.01)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.NORMAL)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.VELOCITY)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.PRESSURE)
outlet_model_part.AddNodalSolutionStepVariable(KratosMultiphysics.EXTERNAL_PRESSURE)
outlet_model_part.CreateNewNode(1, 0.0, 1.0, 0.0)
outlet_model_part.CreateNewNode(2, 2.0, 0.0, 0.0)
outlet_model_part.CreateNewCondition("LineCondition2D2N", 1, [1, 2], properties)
for node in outlet_model_part.Nodes:
node.AddDof(KratosMultiphysics.PRESSURE)
node.AddDof(KratosMultiphysics.VELOCITY_X)
node.AddDof(KratosMultiphysics.VELOCITY_Y)
node.AddDof(KratosMultiphysics.VELOCITY_Z)
node.SetSolutionStepValue(KratosMultiphysics.VELOCITY,[0.0,0.0,0.0])
return model

@classmethod
def __GetBlankParameters(cls):
return KratosMultiphysics.Parameters("""{
"name" : "Processes.KratosMultiphysics.FluidDynamicsBiomedicalApplication.apply_windkessel_outlet_process",
"Parameters" : {
"model_part_name" : "OutletModelPart"
}
}""")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is the purpose of these?


@classmethod
def tearDown(self):
KratosUtilities.DeleteFileIfExisting("test_outlet_table.csv")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is doing nothing.


def testApplyWindkesselOutletProcessConversionFalse(self):
# Set up test - Check the behaviour of Windkessel update when values are written in Pa
model = self.__CreateModel()
settings = KratosMultiphysics.Parameters("""{
"Parameters" : {
"model_part_name" : "OutletModelPart",
"mesh_id" : 0,
"variable_name" : "PRESSURE",
"constrained" : false,
"value" : 0.0,
"interval" : [0.0,"End"],
"resistance1" : 0.02,
"resistance2" : 0.98,
"compliance" : 1.02,
"venous_pressure" : 0.0,
"initial_pressure" : 1.0,
"pressure_in_mmHg" : false,
"echo_level" : 1
}
}""")


# Create the outlet process
outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"])

# Execute and check the outlet process
self.__ExecuteAndCheckOutletProcessFalse(model, outlet_process)

def testApplyWindkesselOutletProcessConversionTrue(self):
# Set up test - Check the behaviour of Windkessel update when values are written in mmHg and converted in Pa
model = self.__CreateModel()
settings = KratosMultiphysics.Parameters("""{
"Parameters" : {
"model_part_name" : "OutletModelPart",
"mesh_id" : 0,
"variable_name" : "PRESSURE",
"constrained" : false,
"value" : 0.0,
"interval" : [0.0,"End"],
"resistance1" : 0.02,
"resistance2" : 0.98,
"compliance" : 1.02,
"venous_pressure" : 0.0,
"initial_pressure" : 1.0,
"pressure_in_mmHg" : true,
"echo_level" : 1
}
}""")


# Create the outlet process
outlet_process = apply_windkessel_outlet_process.ApplyWindkesselOutletProcess(model, settings["Parameters"])

# Execute and check the outlet process
self.__ExecuteAndCheckOutletProcessTrue(model, outlet_process)

def __ExecuteAndCheckOutletProcessFalse(self, model, outlet_process):
# Execute and check ExecuteInitializeSolutionStep
outlet_process.ExecuteInitializeSolutionStep()
outlet_process.ExecuteFinalizeSolutionStep()
for node in model.GetModelPart("OutletModelPart").Nodes:
self.assertTrue(node.Is(KratosMultiphysics.OUTLET))
self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE))
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 0.99,5)
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 0.99,5)
for condition in model.GetModelPart("OutletModelPart").Conditions:
self.assertTrue(condition.Is(KratosMultiphysics.OUTLET))

def __ExecuteAndCheckOutletProcessTrue(self, model, outlet_process):
# Execute and check ExecuteInitializeSolutionStep
outlet_process.ExecuteInitializeSolutionStep()
outlet_process.ExecuteFinalizeSolutionStep()
for node in model.GetModelPart("OutletModelPart").Nodes:
self.assertTrue(node.Is(KratosMultiphysics.OUTLET))
self.assertTrue(node.IsFixed(KratosMultiphysics.PRESSURE))
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.PRESSURE), 131.5472,1)
self.assertAlmostEqual(node.GetSolutionStepValue(KratosMultiphysics.EXTERNAL_PRESSURE), 131.5472,1)
for condition in model.GetModelPart("OutletModelPart").Conditions:
self.assertTrue(condition.Is(KratosMultiphysics.OUTLET))

if __name__ == '__main__':
KratosUnittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Import the tests o test_classes to create the suites
from apply_parabolic_inlet_process_test import ApplyParabolicInletProcessTest
from apply_windkessel_outlet_process_test import ApplyWindkesselOutletProcessTest

def AssembleTestSuites():
''' Populates the test suites to run.
Expand All @@ -29,6 +30,7 @@ def AssembleTestSuites():
# Create a test suite with the selected tests (Small tests):
smallSuite = suites['small']
smallSuite.addTests(KratosUnittest.TestLoader().loadTestsFromTestCases([ApplyParabolicInletProcessTest]))
smallSuite.addTests(KratosUnittest.TestLoader().loadTestsFromTestCases([ApplyWindkesselOutletProcessTest]))

# Create a test suite with the selected tests plus all small tests
nightSuite = suites['nightly']
Expand Down
Loading