Skip to content

Commit cd549bb

Browse files
authored
[feat] add with optimistic locking in BaseLocalDAO (#543)
* [feat] add with optimistic locking in BaseLocalDAO * fix * update unit test
1 parent 30835e2 commit cd549bb

File tree

4 files changed

+76
-11
lines changed

4 files changed

+76
-11
lines changed

dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@
7676
import lombok.NonNull;
7777
import lombok.Value;
7878
import lombok.extern.slf4j.Slf4j;
79+
import pegasus.com.linkedin.metadata.events.IngestionAspectETag;
80+
import pegasus.com.linkedin.metadata.events.IngestionAspectETagArray;
7981

8082
import static com.linkedin.metadata.dao.utils.IngestionUtils.*;
8183
import static com.linkedin.metadata.dao.utils.ModelUtils.*;
@@ -484,7 +486,6 @@ public void setEmitAuditEvent(boolean emitAuditEvent) {
484486
_emitAuditEvent = emitAuditEvent;
485487
}
486488

487-
488489
/**
489490
* Logic common to both {@link #add(Urn, Class, Function, AuditStamp)} and {@link #delete(Urn, Class, AuditStamp, int)} methods.
490491
*
@@ -549,17 +550,44 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
549550
return new AddResult<>(oldValue, oldValue, aspectClass);
550551
}
551552

553+
final AuditStamp optimisticLockAuditStamp = extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, aspectClass);
554+
552555
// Save the newValue as the latest version
553556
long largestVersion =
554-
saveLatest(urn, aspectClass, oldValue, oldAuditStamp, newValue, auditStamp, latest.isSoftDeleted,
555-
trackingContext, ingestionParams.isTestMode());
557+
saveLatest(urn, aspectClass, oldValue,
558+
optimisticLockAuditStamp != null ? optimisticLockAuditStamp : oldAuditStamp,
559+
newValue, auditStamp, latest.isSoftDeleted, trackingContext, ingestionParams.isTestMode());
556560

557561
// Apply retention policy
558562
applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion);
559563

560564
return new AddResult<>(oldValue, newValue, aspectClass);
561565
}
562566

567+
@VisibleForTesting
568+
protected <ASPECT extends RecordTemplate> AuditStamp extractOptimisticLockForAspectFromIngestionParamsIfPossible(
569+
@Nullable IngestionParams ingestionParams, @Nonnull Class<ASPECT> aspectClass) {
570+
if (ingestionParams == null) {
571+
return null;
572+
}
573+
574+
AuditStamp optimisticLockAuditStamp = null;
575+
576+
final IngestionAspectETagArray ingestionAspectETags = ingestionParams.getIngestionETags();
577+
578+
if (ingestionAspectETags != null) {
579+
for (IngestionAspectETag ingestionAspectETag: ingestionAspectETags) {
580+
if (aspectClass.getSimpleName().equalsIgnoreCase(ingestionAspectETag.getAspect_name())
581+
&& ingestionAspectETag.getETag() != null) {
582+
optimisticLockAuditStamp = new AuditStamp();
583+
optimisticLockAuditStamp.setTime(ingestionAspectETag.getETag());
584+
break;
585+
}
586+
}
587+
}
588+
return optimisticLockAuditStamp;
589+
}
590+
563591
/**
564592
* Adds a new version of several aspects for an entity.
565593
*
@@ -1313,15 +1341,15 @@ private <ASPECT extends RecordTemplate> void applyRetention(@Nonnull URN urn, @N
13131341
* @param urn the URN for the entity the aspect is attached to
13141342
* @param aspectClass the aspectClass of the aspect being saved
13151343
* @param oldEntry {@link RecordTemplate} of the previous latest value of aspect, null if new value is the first version
1316-
* @param oldAuditStamp the audit stamp of the previous latest aspect, null if new value is the first version
1344+
* @param optimisticLockAuditStamp the audit stamp of the previous latest aspect, null if new value is the first version. Used for optimistic locking.
13171345
* @param newEntry {@link RecordTemplate} of the new latest value of aspect
13181346
* @param newAuditStamp the audit stamp for the operation
13191347
* @param isSoftDeleted flag to indicate if the previous latest value of aspect was soft deleted
13201348
* @param isTestMode whether the test mode is enabled or not
13211349
* @return the largest version
13221350
*/
13231351
protected abstract <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn,
1324-
@Nonnull Class<ASPECT> aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp oldAuditStamp,
1352+
@Nonnull Class<ASPECT> aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp optimisticLockAuditStamp,
13251353
@Nullable ASPECT newEntry, @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted,
13261354
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode);
13271355

dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/events/IngestionAspectETag.pdl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace pegasus.com.linkedin.metadata.events
66
record IngestionAspectETag {
77

88
/**
9-
* aspect FQCN
9+
* aspect field name, e.g. "status"
1010
*/
1111
aspect_name: optional string = ""
1212

dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import org.testng.annotations.BeforeMethod;
5353
import org.testng.annotations.DataProvider;
5454
import org.testng.annotations.Test;
55+
import pegasus.com.linkedin.metadata.events.IngestionAspectETag;
56+
import pegasus.com.linkedin.metadata.events.IngestionAspectETagArray;
5557

5658
import static com.linkedin.common.AuditStamps.*;
5759
import static org.mockito.Mockito.*;
@@ -87,7 +89,7 @@ public DummyLocalDAO(Class<ENTITY_ASPECT_UNION> aspectClass, BiFunction<FooUrn,
8789

8890
@Override
8991
protected <ASPECT extends RecordTemplate> long saveLatest(FooUrn urn, Class<ASPECT> aspectClass, ASPECT oldEntry,
90-
AuditStamp oldAuditStamp, ASPECT newEntry, AuditStamp newAuditStamp, boolean isSoftDeleted,
92+
AuditStamp optimisticLockAuditStamp, ASPECT newEntry, AuditStamp newAuditStamp, boolean isSoftDeleted,
9193
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
9294
return 0;
9395
}
@@ -802,4 +804,39 @@ public void testAspectCallbackHelperWithUnregisteredAspect() throws URISyntaxExc
802804
// Verify that the result is the same as the input aspect since it's not registered
803805
assertEquals(result.getUpdatedAspect(), foo);
804806
}
807+
808+
@Test
809+
public void testExtractOptimisticLockForAspectFromIngestionParamsIfPossible() {
810+
IngestionAspectETag ingestionAspectETag = new IngestionAspectETag();
811+
ingestionAspectETag.setAspect_name("aspectFoo");
812+
ingestionAspectETag.setETag(1234L);
813+
814+
IngestionParams ingestionParams = new IngestionParams();
815+
ingestionParams.setIngestionETags(new IngestionAspectETagArray(ingestionAspectETag));
816+
817+
AuditStamp result = _dummyLocalDAO.extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, AspectFoo.class);
818+
819+
assertEquals(result.getTime(), Long.valueOf(1234L));
820+
}
821+
822+
@Test
823+
public void testExtractOptimisticLockForAspectFromIngestionParamsIfPossibleIngestionParamsIsNull() {
824+
AuditStamp result = _dummyLocalDAO.extractOptimisticLockForAspectFromIngestionParamsIfPossible(null, AspectFoo.class);
825+
826+
assertNull(result);
827+
}
828+
829+
@Test
830+
public void testExtractOptimisticLockForAspectFromIngestionParamsIfPossibleAspectNameDoesntMatch() {
831+
IngestionAspectETag ingestionAspectETag = new IngestionAspectETag();
832+
ingestionAspectETag.setAspect_name("aspectBar");
833+
ingestionAspectETag.setETag(1234L);
834+
835+
IngestionParams ingestionParams = new IngestionParams();
836+
ingestionParams.setIngestionETags(new IngestionAspectETagArray(ingestionAspectETag));
837+
838+
AuditStamp result = _dummyLocalDAO.extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, AspectFoo.class);
839+
840+
assertNull(result);
841+
}
805842
}

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -598,12 +598,12 @@ protected <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTra
598598

599599
@Override
600600
protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
601-
@Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nullable ASPECT newValue,
601+
@Nullable ASPECT oldValue, @Nullable AuditStamp optimisticLockAuditStamp, @Nullable ASPECT newValue,
602602
@Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, @Nullable IngestionTrackingContext trackingContext,
603603
boolean isTestMode) {
604604
// Save oldValue as the largest version + 1
605605
long largestVersion = 0;
606-
if ((isSoftDeleted || oldValue != null) && oldAuditStamp != null && _changeLogEnabled) {
606+
if ((isSoftDeleted || oldValue != null) && optimisticLockAuditStamp != null && _changeLogEnabled) {
607607
// When saving on entity which has history version (including being soft deleted), and changeLog is enabled,
608608
// the saveLatest will process the following steps:
609609

@@ -620,11 +620,11 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
620620
}
621621
// Move latest version to historical version by insert a new record only if we are not overwriting the latest version.
622622
if (!_overwriteLatestVersionEnabled) {
623-
insert(urn, oldValue, aspectClass, oldAuditStamp, largestVersion, trackingContext, isTestMode);
623+
insert(urn, oldValue, aspectClass, optimisticLockAuditStamp, largestVersion, trackingContext, isTestMode);
624624
}
625625
// update latest version
626626
updateWithOptimisticLocking(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION,
627-
new Timestamp(oldAuditStamp.getTime()), trackingContext, isTestMode);
627+
new Timestamp(optimisticLockAuditStamp.getTime()), trackingContext, isTestMode);
628628
} else {
629629
// When for fresh ingestion or with changeLog disabled
630630
// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards

0 commit comments

Comments
 (0)