Skip to content

Commit 78f6bd9

Browse files
committed
Fixed effect upsert bug for postgres store
1 parent d2fc7b6 commit 78f6bd9

File tree

6 files changed

+48
-2
lines changed

6 files changed

+48
-2
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,8 @@ public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7777
[TestMethod]
7878
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
7979
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
80+
81+
[TestMethod]
82+
public override Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects()
83+
=> UpsertingExistingEffectDoesNotAffectOtherExistingEffects(FunctionStoreFactory.Create());
8084
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,5 +706,34 @@ async Task (workflow) =>
706706
effectIds.Single().Id.ShouldBe("SomeEffectId");
707707
(await cp.Effects.GetValue<Guid>("SomeEffectId")).ShouldBe(someEffectIdValue);
708708
}
709-
709+
710+
public abstract Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects();
711+
public async Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects(Task<IFunctionStore> storeTask)
712+
{
713+
var store = await storeTask;
714+
var storedId = TestStoredId.Create();
715+
716+
var effectStore = store.EffectsStore;
717+
var effectResults = new EffectResults(
718+
TestFlowId.Create(),
719+
storedId,
720+
lazyExistingEffects: new Lazy<Task<IReadOnlyList<StoredEffect>>>(
721+
() => new List<StoredEffect>().CastTo<IReadOnlyList<StoredEffect>>().ToTask()
722+
),
723+
effectStore,
724+
DefaultSerializer.Instance
725+
);
726+
var effect = new Effect(effectResults);
727+
728+
await effect.Capture("1", () => "hello world");
729+
await effect.Capture("2", () => "hello universe");
730+
await effect.Flush();
731+
732+
await effect.Upsert("1", "hello world again");
733+
734+
var storedEffects = await effectStore.GetEffectResults(storedId);
735+
storedEffects.Count.ShouldBe(2);
736+
storedEffects.Single(se => se.EffectId.Id == "1").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo<string>().ShouldBe("hello world again");
737+
storedEffects.Single(se => se.EffectId.Id == "2").Result!.ToStringFromUtf8Bytes().DeserializeFromJsonTo<string>().ShouldBe("hello universe");
738+
}
710739
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,8 @@ public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7676
[TestMethod]
7777
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
7878
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
79+
80+
[TestMethod]
81+
public override Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects()
82+
=> UpsertingExistingEffectDoesNotAffectOtherExistingEffects(FunctionStoreFactory.Create());
7983
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,8 @@ public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7777
[TestMethod]
7878
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
7979
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
80+
81+
[TestMethod]
82+
public override Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects()
83+
=> UpsertingExistingEffectDoesNotAffectOtherExistingEffects(FunctionStoreFactory.Create());
8084
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ INSERT INTO {tablePrefix}_effects
107107
var sql= $@"
108108
UPDATE {tablePrefix}_effects
109109
SET status = $1, result = $2, exception = $3
110-
WHERE type = $4 AND instance = $5;";
110+
WHERE type = $4 AND instance = $5 AND id_hash = $6;";
111111

112112
foreach (var (storedId, _, _, storedEffect) in changes.Where(s => s.Operation == CrudOperation.Update))
113113
{
@@ -117,6 +117,7 @@ INSERT INTO {tablePrefix}_effects
117117
command.AddParameter(JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value);
118118
command.AddParameter(storedId.Type.Value);
119119
command.AddParameter(storedId.Instance.Value);
120+
command.AddParameter(storedEffect.StoredEffectId.Value);
120121

121122
commands.Add(command);
122123
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,8 @@ public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush()
7777
[TestMethod]
7878
public override Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow()
7979
=> CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlushInFlow(FunctionStoreFactory.Create());
80+
81+
[TestMethod]
82+
public override Task UpsertingExistingEffectDoesNotAffectOtherExistingEffects()
83+
=> UpsertingExistingEffectDoesNotAffectOtherExistingEffects(FunctionStoreFactory.Create());
8084
}

0 commit comments

Comments
 (0)