|
48 | 48 | import java.util.LinkedHashMap;
|
49 | 49 | import java.util.List;
|
50 | 50 | import java.util.Map;
|
| 51 | +import java.util.Objects; |
51 | 52 | import java.util.Set;
|
52 | 53 | import java.util.function.Function;
|
53 | 54 | import java.util.stream.Collectors;
|
54 | 55 | import javax.annotation.Nonnull;
|
55 | 56 | import javax.annotation.Nullable;
|
| 57 | +import lombok.extern.slf4j.Slf4j; |
56 | 58 |
|
57 | 59 | import static com.linkedin.metadata.dao.BaseReadDAO.*;
|
58 | 60 | import static com.linkedin.metadata.dao.utils.IngestionUtils.*;
|
|
73 | 75 | * @param <INTERNAL_ASPECT_UNION> must be a valid internal aspect union type supported by the internal snapshot
|
74 | 76 | * @param <ASSET> must be a valid asset type defined in com.linkedin.metadata.asset
|
75 | 77 | */
|
| 78 | +@Slf4j |
76 | 79 | public abstract class BaseEntityResource<
|
77 | 80 | // @formatter:off
|
78 | 81 | KEY,
|
@@ -172,6 +175,11 @@ protected BaseLocalDAO<INTERNAL_ASPECT_UNION, URN> getShadowLocalDAO() {
|
172 | 175 | return null; // override in resource class only if needed
|
173 | 176 | }
|
174 | 177 |
|
| 178 | + @Nullable |
| 179 | + protected BaseLocalDAO<INTERNAL_ASPECT_UNION, URN> getShadowReadLocalDAO() { |
| 180 | + return null; // override in resource class only if needed |
| 181 | + } |
| 182 | + |
175 | 183 | /**
|
176 | 184 | * Creates an URN from its string representation.
|
177 | 185 | */
|
@@ -1142,18 +1150,70 @@ private Map<URN, List<UnionTemplate>> getUrnAspectMap(@Nonnull Collection<URN> u
|
1142 | 1150 | final Map<URN, List<UnionTemplate>> urnAspectsMap =
|
1143 | 1151 | urns.stream().collect(Collectors.toMap(Function.identity(), urn -> new ArrayList<>()));
|
1144 | 1152 |
|
1145 |
| - if (isInternalModelsEnabled) { |
1146 |
| - getLocalDAO().get(keys) |
1147 |
| - .forEach((key, aspect) -> aspect.ifPresent(metadata -> urnAspectsMap.get(key.getUrn()) |
1148 |
| - .add(ModelUtils.newAspectUnion(_internalAspectUnionClass, metadata)))); |
| 1153 | + if (getShadowReadLocalDAO() == null) { |
| 1154 | + if (isInternalModelsEnabled) { |
| 1155 | + getLocalDAO().get(keys) |
| 1156 | + .forEach((key, aspect) -> aspect.ifPresent(metadata -> urnAspectsMap.get(key.getUrn()) |
| 1157 | + .add(ModelUtils.newAspectUnion(_internalAspectUnionClass, metadata)))); |
| 1158 | + } else { |
| 1159 | + getLocalDAO().get(keys) |
| 1160 | + .forEach((key, aspect) -> aspect.ifPresent(metadata -> urnAspectsMap.get(key.getUrn()) |
| 1161 | + .add(ModelUtils.newAspectUnion(_aspectUnionClass, metadata)))); |
| 1162 | + } |
| 1163 | + return urnAspectsMap; |
1149 | 1164 | } else {
|
1150 |
| - getLocalDAO().get(keys) |
1151 |
| - .forEach((key, aspect) -> aspect.ifPresent( |
1152 |
| - metadata -> urnAspectsMap.get(key.getUrn()).add(ModelUtils.newAspectUnion(_aspectUnionClass, metadata)))); |
| 1165 | + return getUrnAspectMapFromShadowDao(urns, keys, isInternalModelsEnabled); |
1153 | 1166 | }
|
| 1167 | + } |
| 1168 | + |
| 1169 | + @Nonnull |
| 1170 | + private Map<URN, List<UnionTemplate>> getUrnAspectMapFromShadowDao( |
| 1171 | + @Nonnull Collection<URN> urns, |
| 1172 | + @Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys, |
| 1173 | + boolean isInternalModelsEnabled) { |
| 1174 | + |
| 1175 | + Map<AspectKey<URN, ? extends RecordTemplate>, java.util.Optional<? extends RecordTemplate>> localResults = |
| 1176 | + getLocalDAO().get(keys); |
| 1177 | + |
| 1178 | + BaseLocalDAO<INTERNAL_ASPECT_UNION, URN> shadowDao = getShadowReadLocalDAO(); |
| 1179 | + Map<AspectKey<URN, ? extends RecordTemplate>, java.util.Optional<? extends RecordTemplate>> shadowResults = |
| 1180 | + shadowDao.get(keys); |
| 1181 | + |
| 1182 | + final Map<URN, List<UnionTemplate>> urnAspectsMap = |
| 1183 | + urns.stream().collect(Collectors.toMap(Function.identity(), urn -> new ArrayList<>())); |
| 1184 | + |
| 1185 | + keys.forEach(key -> { |
| 1186 | + java.util.Optional<? extends RecordTemplate> localValue = localResults.getOrDefault(key, java.util.Optional.empty()); |
| 1187 | + java.util.Optional<? extends RecordTemplate> shadowValue = shadowResults.getOrDefault(key, java.util.Optional.empty()); |
| 1188 | + |
| 1189 | + RecordTemplate valueToUse = null; |
| 1190 | + |
| 1191 | + if (localValue.isPresent() && shadowValue.isPresent()) { |
| 1192 | + if (!Objects.equals(localValue.get(), shadowValue.get())) { |
| 1193 | + log.warn("Aspect mismatch for URN {} and aspect {}: local = {}, shadow = {}", |
| 1194 | + key.getUrn(), key.getAspectClass().getSimpleName(), |
| 1195 | + localValue.get(), shadowValue.get()); |
| 1196 | + valueToUse = localValue.get(); // fallback to local if there's mismatch |
| 1197 | + } else { |
| 1198 | + valueToUse = shadowValue.get(); // match → use shadow |
| 1199 | + } |
| 1200 | + } else if (shadowValue.isPresent()) { |
| 1201 | + valueToUse = shadowValue.get(); |
| 1202 | + } else if (localValue.isPresent()) { |
| 1203 | + valueToUse = localValue.get(); |
| 1204 | + } |
| 1205 | + |
| 1206 | + if (valueToUse != null) { |
| 1207 | + urnAspectsMap.get(key.getUrn()).add(ModelUtils.newAspectUnion( |
| 1208 | + (Class<? extends ASPECT_UNION>) (isInternalModelsEnabled ? _internalAspectUnionClass : _aspectUnionClass), |
| 1209 | + valueToUse)); |
| 1210 | + } |
| 1211 | + }); |
| 1212 | + |
1154 | 1213 | return urnAspectsMap;
|
1155 | 1214 | }
|
1156 | 1215 |
|
| 1216 | + |
1157 | 1217 | @Nonnull
|
1158 | 1218 | private SNAPSHOT newSnapshot(@Nonnull URN urn, @Nonnull List<UnionTemplate> aspects) {
|
1159 | 1219 | return ModelUtils.newSnapshot(_snapshotClass, urn, aspects);
|
|
0 commit comments