-
Notifications
You must be signed in to change notification settings - Fork 20
Worked Example (Durable functions) : Bank Account
Extending the previous example (Worked Example : Bank Account), we can add batch processes to be applied to all the bank accounts in our system in order to accrue and pay interest on the account.
To do this we will use Azure Durable Functions to perform a fan-out in order to run the process for all of the accounts in parallel.
We accrue interest on a daily basis, based on the balance of the account at that point but we only pay accrued interest at the end of the month
This function is kicked off by a timer trigger and uses a classification to get a list of all the bank accounts in the system as at the point in time it was initiated. It then passes this list to a durable functions orchestration in order to get them all processed in parallel.
[FunctionName(nameof(AccrueInterestForAllAccountsTimer))]
public static async Task AccrueInterestForAllAccountsTimer(
[TimerTrigger("0 30 1 * * *",
RunOnStartup=false,
UseMonitor =true)] TimerInfo accrueInterestTimer,
[DurableClient] IDurableOrchestrationClient accrueInterestOrchestration,
[Classification("Bank", "Account", "ALL", @"")] Classification clsAllAccounts
)
{
// Get all the account numbers
IEnumerable<string> allAccounts = await clsAllAccounts.GetAllInstanceKeys();
await accrueInterestOrchestration.StartNewAsync(nameof(AccrueInterestForAllAccounts), allAccounts);
}
The orchestration starts a task for each of these accounts and has a WaitAll which causes them all to run in parallel, returning when they have all finished.
[FunctionName(nameof(AccrueInterestForAllAccounts))]
public static async Task AccrueInterestForAllAccounts
([OrchestrationTrigger] IDurableOrchestrationContext context)
{
IEnumerable<string> allAccounts = context.GetInput<IEnumerable<string>>();
if (null != allAccounts)
{
var accrualTasks = new List<Task<Tuple<string, bool>>>();
foreach (string accountNumber in allAccounts)
{
Task<Tuple<string, bool>> accrualTask = context.CallActivityAsync<Tuple<string, bool>>(nameof(AccrueInterestForSpecificAccount), accountNumber);
accrualTasks.Add(accrualTask);
}
// Perform all the accruals in parallel
await Task.WhenAll(accrualTasks);
// - - - 8< - - - - - - -
Because each individual accrual may fail and we want to know about this but don't want to stop the entire orchestration there is a boolean value returned to indicate whether the individual activity for each account number was successful.
List<string> failedAccruals = new List<string>();
foreach (var accrualTask in accrualTasks)
{
if (!accrualTask.Result.Item2)
{
failedAccruals.Add(accrualTask.Result.Item1);
}
}
We can log the failures, or retry them or whatever business process is appropriate.
The work for accruing the interest for one account is done inside a durable functions activity. In order for the bank to make money we charge a higher interest rate for accounts that are in debit than we pay for accounts that are in credit. In order for the accrual process to be idempotent we only accrue interest for an account if it does not already have an accrual event for today.
[FunctionName(nameof(AccrueInterestForSpecificAccount))]
public static async Task<Tuple<string, bool>> AccrueInterestForSpecificAccount
([ActivityTrigger] IDurableActivityContext accrueInterestContext)
{
const decimal DEBIT_INTEREST_RATE = 0.001M;
const decimal CREDIT_INTEREST_RATE = 0.0005M;
string accountNumber = accrueInterestContext.GetInput<string>();
if (!string.IsNullOrEmpty(accountNumber))
{
EventStream bankAccountEvents = new EventStream(new EventStreamAttribute("Bank", "Account", accountNumber));
if (await bankAccountEvents.Exists())
{
// Has the accrual been done today for this account?
Classification clsAccruedToday = new Classification(new ClassificationAttribute("Bank", "Account", accountNumber, nameof(InterestAccruedToday)));
ClassificationResponse isAccrued = await clsAccruedToday.Classify<InterestAccruedToday>();
if (isAccrued.Result != ClassificationResponse.ClassificationResults.Include)
{
// Get the account balance
Projection prjBankAccountBalance = new Projection(new ProjectionAttribute("Bank", "Account", accountNumber, nameof(Balance)));
// Get the current account balance, as at midnight
Balance projectedBalance = await prjBankAccountBalance.Process<Balance>(DateTime.Today);
if (null != projectedBalance)
{
Account.Events.InterestAccrued evAccrued = new Account.Events.InterestAccrued()
{
Commentary = $"Daily scheduled interest accrual",
AccrualEffectiveDate = DateTime.Today // set the accrual to midnight today
};
// calculate the accrual amount
if (projectedBalance.CurrentBalance >= 0)
{
// Using the credit rate
evAccrued.AmountAccrued = CREDIT_INTEREST_RATE * projectedBalance.CurrentBalance;
evAccrued.InterestRateInEffect = CREDIT_INTEREST_RATE;
}
else
{
// Use the debit rate
evAccrued.AmountAccrued = DEBIT_INTEREST_RATE * projectedBalance.CurrentBalance;
evAccrued.InterestRateInEffect = DEBIT_INTEREST_RATE;
}
try
{
await bankAccountEvents.AppendEvent(evAccrued, isAccrued.AsOfSequence);
}
catch (EventSourcingOnAzureFunctions.Common.EventSourcing.Exceptions.EventStreamWriteException exWrite)
{
// We can't be sure this hasn't already run...
return new Tuple<string, bool>(accountNumber, false);
}
}
}
}
}
return new Tuple<string, bool>(accountNumber, true);
}
The classification to decide if an accrual has already been performed for a given day is just concerned with any interest accrued event that occurred today:
/// <summary>
/// A classification of a bank account to state whether that account has had interest
/// accrued today
/// </summary>
public class InterestAccruedToday
: ClassificationBase,
IClassifyEventType<InterestAccrued>
{
public ClassificationResponse.ClassificationResults ClassifyEventInstance(InterestAccrued eventInstance)
{
// if it happened today set it to true
if (eventInstance.AccrualEffectiveDate.Date == DateTime.Today )
{
return ClassificationResponse.ClassificationResults.Include;
}
return ClassificationResponse.ClassificationResults.Unchanged;
}
public override void SetParameter(string parameterName, object parameterValue)
{
// This classifier has no parameters
}
}
Because the interest is paid as part of an end-of-month process the trigger for this in Azure functions is a standard HTTP trigger, but the classification is as with the interest accrual functionality.
[FunctionName(nameof(ApplyInterestForAllAccountsTrigger))]
public static async Task ApplyInterestForAllAccountsTrigger(
[HttpTrigger(AuthorizationLevel.Function, "POST", Route = @"ApplyInterestForAllAccounts")]HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient accrueInterestOrchestration,
[Classification("Bank", "Account", "ALL", @"")] Classification clsAllAccounts)
{
// Get all the account numbers
IEnumerable<string> allAccounts = await req.Content.ReadAsAsync<IEnumerable<string>>();
if ( (null == allAccounts) || (allAccounts.Count() == 0) )
{
// If no account list passed it, get all the accounts
allAccounts = await clsAllAccounts.GetAllInstanceKeys();
}
await accrueInterestOrchestration.StartNewAsync(nameof(ApplyInterestForAllAccounts), allAccounts);
}
There is also the option to explicitly pass in account numbers if we want the process to be applied just to these (you might do this for accounts being closed).
Then the actual process of paying interest is in two parts - first we see if we need to extend an overdraft in order to pay the interest and once that is done we go ahead and pay the interest. In order to do that two-step process the orchestration calls a sub-orchestration:-
[FunctionName(nameof(ApplyInterestForAllAccounts))]
public static async Task ApplyInterestForAllAccounts(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
IEnumerable<string> allAccounts = context.GetInput<IEnumerable<string>>();
if (null != allAccounts)
{
var overdraftForInterestTasks = new List<Task>();
foreach (string accountNumber in allAccounts)
{
Task overdraftTask = context.CallSubOrchestratorAsync(nameof(ApplyInterestForSpecificAccount), accountNumber);
overdraftForInterestTasks.Add(overdraftTask);
}
// Perform all the overdraft extension operations in parallel
await Task.WhenAll(overdraftForInterestTasks);
}
}
The sub-orchestration then does the two steps in sequence:
[FunctionName(nameof(ApplyInterestForSpecificAccount)) ]
public static async Task ApplyInterestForSpecificAccount(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string accountNumber = context.GetInput<string>();
if (! string.IsNullOrEmpty(accountNumber ) )
{
Tuple<string, bool> overdraftTask = await context.CallActivityAsync<Tuple<string, bool>>(nameof(SetOverdraftForInterestForSpecificAccount), accountNumber); ;
if (overdraftTask.Item2 )
{
// ok to pay the interest
await context.CallActivityAsync(nameof(PayInterestForSpecificAccount), accountNumber );
}
}
}
The process of deciding if the account needs to have its overdraft extended in order to pay the accrued interest required three [projection] instances to run and if the result is that an extension is required it appends an overdraft set event on to the bank account's event stream.
[FunctionName(nameof(SetOverdraftForInterestForSpecificAccount))]
public static async Task<Tuple<string, bool >> SetOverdraftForInterestForSpecificAccount
([ActivityTrigger] IDurableActivityContext interestOverdraftContext)
{
string accountNumber = interestOverdraftContext.GetInput<string>();
bool success = true;
Command cmdPayInterest = new Command(
new CommandAttribute("Bank",
"Pay Interest",
interestOverdraftContext.InstanceId )
);
if (!string.IsNullOrWhiteSpace(accountNumber))
{
string result = "No overdraft required";
await cmdPayInterest.InitiateStep(AccountCommands.COMMAND_STEP_OVERDRAFT ,
"Bank",
"Account",
accountNumber );
// run the "set overdraft limit for interest" function
// 1- Get interest due...
Projection prjInterestDue = new Projection(
new ProjectionAttribute(
"Bank",
"Account",
accountNumber,
nameof(InterestDue)
)
);
// get the interest owed / due as now
InterestDue interestDue = await prjInterestDue.Process<InterestDue>();
if (null != interestDue)
{
// if the interest due is negative we need to make sure the account has sufficient balance
if (interestDue.Due < 0.00M)
{
Projection prjBankAccountBalance = new Projection(
new ProjectionAttribute(
"Bank",
"Account",
accountNumber,
nameof(InterestDue)
)
);
Balance balance = await prjBankAccountBalance.Process<Balance>();
if (null != balance)
{
decimal availableBalance = balance.CurrentBalance;
// is there an overdraft?
Projection prjBankAccountOverdraft = new Projection(
new ProjectionAttribute(
"Bank",
"Account",
accountNumber,
nameof(OverdraftLimit)
)
);
OverdraftLimit overdraft = await prjBankAccountOverdraft.Process<OverdraftLimit>();
if (null != overdraft)
{
availableBalance += overdraft.CurrentOverdraftLimit;
}
if (availableBalance < interestDue.Due)
{
// Force an overdraft extension
EventStream bankAccountEvents = new EventStream(
new EventStreamAttribute(
"Bank",
"Account",
accountNumber
)
);
decimal newOverdraft = overdraft.CurrentOverdraftLimit;
decimal extension = 10.00M + Math.Abs(interestDue.Due % 10.00M);
OverdraftLimitSet evNewLimit = new OverdraftLimitSet()
{
OverdraftLimit = newOverdraft + extension,
Commentary = $"Overdraft extended to pay interest of {interestDue.Due} ",
Unauthorised = true
};
result = $"Overdraft set to {evNewLimit.OverdraftLimit } ({evNewLimit.Commentary})";
try
{
await bankAccountEvents.AppendEvent(evNewLimit, balance.CurrentSequenceNumber);
}
catch (EventSourcingOnAzureFunctions.Common.EventSourcing.Exceptions.EventStreamWriteException exWrite)
{
success = false;
}
}
}
}
}
// Log the step completion if it was successful
if (success)
{
await cmdPayInterest.StepCompleted(AccountCommands.COMMAND_STEP_OVERDRAFT,
result,
"Bank",
"Account",
accountNumber);
}
}
return new Tuple<string, bool>(accountNumber, success);
}
Actually paying the interest is a somewhat shorter process of finding out how much is due and then adding an interest paid event for it.
[FunctionName(nameof(PayInterestForSpecificAccount))]
public static async Task PayInterestForSpecificAccount
([ActivityTrigger] IDurableActivityContext payInterestContext)
{
string accountNumber = payInterestContext.GetInput<string>();
Command cmdPayInterest = new Command(
new CommandAttribute("Bank",
"Pay Interest",
payInterestContext.InstanceId)
);
if (!string.IsNullOrWhiteSpace(accountNumber))
{
string result = "";
await cmdPayInterest.InitiateStep(AccountCommands.COMMAND_STEP_PAY_INTEREST,
"Bank",
"Account",
accountNumber);
// 1- Get interest due...
Projection prjInterestDue = new Projection(
new ProjectionAttribute(
"Bank",
"Account",
accountNumber,
nameof(InterestDue)
)
);
// get the interest owed / due as now
InterestDue interestDue = await prjInterestDue.Process<InterestDue>();
if (null != interestDue)
{
// pay the interest
decimal amountToPay = decimal.Round(interestDue.Due, 2, MidpointRounding.AwayFromZero);
if (amountToPay != 0.00M)
{
EventStream bankAccountEvents = new EventStream(
new EventStreamAttribute(
"Bank",
"Account",
accountNumber
)
);
InterestPaid evInterestPaid = new InterestPaid()
{
AmountPaid = decimal.Round(interestDue.Due, 2, MidpointRounding.AwayFromZero),
Commentary = $"Interest due {interestDue.Due} as at {interestDue.CurrentSequenceNumber}"
};
await bankAccountEvents.AppendEvent(evInterestPaid);
result = $"Interest paid: {evInterestPaid.AmountPaid} ({evInterestPaid.Commentary})";
}
}
await cmdPayInterest.StepCompleted(AccountCommands.COMMAND_STEP_PAY_INTEREST,
result,
"Bank",
"Account",
accountNumber);
}
The amount due is simply the amount accrued less the amount paid (as a projection) which is handly because if this command is called twice no side effect will occur for the second run.
public class InterestDue
: ProjectionBase ,
IHandleEventType<InterestAccrued >,
IHandleEventType<InterestPaid >
{
decimal _interestDue;
public decimal Due
{
get
{
return _interestDue;
}
}
public void HandleEventInstance(InterestAccrued eventInstance)
{
if (null != eventInstance )
{
_interestDue += eventInstance.AmountAccrued;
}
}
public void HandleEventInstance(InterestPaid eventInstance)
{
if (null != eventInstance)
{
_interestDue -= eventInstance.AmountPaid;
}
}
}