Skip to content

Commit acb50da

Browse files
wt0530githubgxll
authored andcommitted
[fix][dingo-exec] Fix insert into... on duplicate key
1 parent 46abbb0 commit acb50da

File tree

2 files changed

+137
-17
lines changed

2 files changed

+137
-17
lines changed

dingo-exec/src/main/java/io/dingodb/exec/operator/DistributeOperator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,11 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
6464
Optional.ofNullable(param.getTable().getPartitionStrategy())
6565
.orElse(DingoPartitionServiceProvider.RANGE_FUNC_NAME));
6666
CommonId partId;
67+
CommonId tablePartId = null;
6768
if (param.getTableId().type.code == CommonId.CommonType.INDEX.code
6869
&& indexTable != null) {
6970
context.setIndexId(param.getTableId());
70-
ps = PartitionService.getService(
71+
PartitionService indexPs = PartitionService.getService(
7172
Optional.ofNullable(indexTable.getPartitionStrategy())
7273
.orElse(DingoPartitionServiceProvider.RANGE_FUNC_NAME));
7374
Object[] indexTuple = new Object[indexTable.columns.size()];
@@ -80,12 +81,16 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
8081
KeyValueCodec indexCodec = CodecService.getDefault()
8182
.createKeyValueCodec(indexTable.getCodecVersion(), indexTable.version,
8283
indexTable.tupleType(), indexTable.keyMapping());
83-
partId = ps.calcPartId(indexTuple, wrap(indexCodec::encodeKey), param.getDistributions());
84+
partId = indexPs.calcPartId(indexTuple, wrap(indexCodec::encodeKey), param.getDistributions());
85+
NavigableMap<ByteArrayUtils.ComparableByteArray, RangeDistribution> distribution =
86+
MetaService.root().getRangeDistribution(param.getTable().tableId);
87+
tablePartId = ps.calcPartId(newTuple, wrap(param.getCodec()::encodeKey), distribution);
8488
} else {
8589
partId = ps.calcPartId(newTuple, wrap(param.getCodec()::encodeKey), param.getDistributions());
8690
}
8791
RangeDistribution distribution = RangeDistribution.builder().id(partId).build();
8892
context.setDistribution(distribution);
93+
context.setTablePartId(tablePartId);
8994

9095
return vertex.getSoleEdge().transformToNext(context, tuple);
9196
} catch (RegionSplitException e) {

dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartInsertOperator.java

Lines changed: 130 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import io.dingodb.meta.entity.IndexTable;
4848
import io.dingodb.meta.entity.IndexType;
4949
import io.dingodb.meta.entity.Table;
50+
import io.dingodb.partition.DingoPartitionServiceProvider;
51+
import io.dingodb.partition.PartitionService;
5052
import io.dingodb.store.api.StoreInstance;
5153
import io.dingodb.store.api.transaction.data.Op;
5254
import io.dingodb.store.api.transaction.exception.DuplicateEntryException;
@@ -89,8 +91,11 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
8991
KeyValueCodec codec = param.getCodec();
9092
boolean isVector = false;
9193
boolean isDocument = false;
94+
Object[] primaryOldTuple = tuple;
95+
Table indexTable = null;
9296
if (context.getIndexId() != null) {
93-
Table indexTable = (Table) TransactionManager.getIndex(txnId, context.getIndexId());
97+
boolean duplicate = param.getUpdateMapping() != null && param.getUpdates() != null;
98+
indexTable = (Table) TransactionManager.getIndex(txnId, context.getIndexId());
9499
if (indexTable == null) {
95100
LogUtils.error(log, "[ddl] TxnPartInsert get index table null, indexId:{}", context.getIndexId());
96101
return false;
@@ -133,6 +138,87 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
133138
codec = CodecService.getDefault().createKeyValueCodec(
134139
indexTable.getCodecVersion(), indexTable.version, indexTable.tupleType(), indexTable.keyMapping()
135140
);
141+
// index duplicate key check
142+
if (duplicate && context.getTablePartId() != null) {
143+
Object[] primaryTuple = (Object[]) param.getSchema().convertFrom(primaryOldTuple, ValueConverter.INSTANCE);
144+
KeyValue primaryKv = wrap(param.getCodec()::encode).apply(primaryTuple);
145+
StoreInstance store = Services.KV_STORE.getInstance(param.getTable().tableId, context.getTablePartId());
146+
KeyValue getPrimaryKv = store.txnGet(txnId.seq, primaryKv.getKey(), param.getLockTimeOut());
147+
if (getPrimaryKv != null && getPrimaryKv.getValue() != null) {
148+
context.setDuplicateKey(true);
149+
Object[] getPrimaryTuple = param.getCodec().decode(getPrimaryKv);
150+
if (!param.isPessimisticTxn()) {
151+
Object defaultVal = null;
152+
if (columnIndices.contains(-1)) {
153+
Column addColumn = indexTable.getColumns().stream()
154+
.filter(column -> column.getSchemaState() != SchemaState.SCHEMA_PUBLIC)
155+
.findFirst().orElse(null);
156+
if (addColumn != null) {
157+
defaultVal = addColumn.getDefaultVal();
158+
}
159+
}
160+
Object finalDefaultVal = defaultVal;
161+
Object[] finalGetPrimaryTuple = getPrimaryTuple;
162+
getPrimaryTuple = columnIndices.stream().map(i -> {
163+
if (i == -1) {
164+
return finalDefaultVal;
165+
}
166+
return finalGetPrimaryTuple[i];
167+
}).toArray();
168+
}
169+
PartitionService ps = PartitionService.getService(
170+
Optional.ofNullable(indexTable.getPartitionStrategy())
171+
.orElse(DingoPartitionServiceProvider.RANGE_FUNC_NAME));
172+
KeyValue oldKv = wrap(codec::encode).apply(getPrimaryTuple);
173+
CommonId oldPartId = ps.calcPartId(oldKv.getKey(), MetaService.root().getRangeDistribution(tableId));
174+
StoreInstance tmpLocalStore = Services.LOCAL_STORE.getInstance(context.getIndexId(), oldPartId);
175+
CodecService.getDefault().setId(oldKv.getKey(), oldPartId.domain);
176+
byte[] key = oldKv.getKey();
177+
byte[] txnIdByte = txnId.encode();
178+
byte[] tableIdByte = tableId.encode();
179+
byte[] partIdByte = oldPartId.encode();
180+
int len = txnIdByte.length + tableIdByte.length + partIdByte.length;
181+
byte[] insertKey = ByteUtils.encode(
182+
CommonId.CommonType.TXN_CACHE_DATA,
183+
oldKv.getKey(),
184+
Op.PUTIFABSENT.getCode(),
185+
len,
186+
txnIdByte,
187+
tableIdByte,
188+
partIdByte);
189+
byte[] deleteKey = Arrays.copyOf(insertKey, insertKey.length);
190+
deleteKey[deleteKey.length - 2] = (byte) Op.DELETE.getCode();
191+
byte[] updateKey = Arrays.copyOf(insertKey, insertKey.length);
192+
updateKey[updateKey.length - 2] = (byte) Op.PUT.getCode();
193+
List<byte[]> bytes = new ArrayList<>(3);
194+
bytes.add(insertKey);
195+
bytes.add(deleteKey);
196+
bytes.add(updateKey);
197+
List<KeyValue> keyValues = tmpLocalStore.get(bytes);
198+
if (keyValues != null && !keyValues.isEmpty()) {
199+
if (keyValues.size() > 1) {
200+
throw new RuntimeException(txnId + " Key is not existed than two in local store");
201+
}
202+
KeyValue value = keyValues.get(0);
203+
byte[] oldKey = value.getKey();
204+
if (oldKey[oldKey.length - 2] == Op.PUTIFABSENT.getCode()
205+
|| oldKey[oldKey.length - 2] == Op.PUT.getCode()) {
206+
boolean unique = index.isUnique();
207+
if (unique) {
208+
throw new DuplicateEntryException("Duplicate entry "
209+
+ TransactionUtil.duplicateEntryKey(tableId, key, txnId) + " for key 'PRIMARY'");
210+
}
211+
if (param.getUpdateMapping() != null && param.getUpdates() != null) {
212+
tmpLocalStore.delete(oldKey);
213+
}
214+
}
215+
} else {
216+
if (context.isDuplicateKey()) {
217+
tmpLocalStore.put(new KeyValue(deleteKey, Arrays.copyOf(oldKv.getValue(), oldKv.getValue().length)));
218+
}
219+
}
220+
}
221+
}
136222
}
137223
Object[] newTuple = (Object[]) schema.convertFrom(tuple, ValueConverter.INSTANCE);
138224
KeyValue keyValue = wrap(codec::encode).apply(newTuple);
@@ -172,15 +258,18 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
172258
if (context.isDuplicateKey()) {
173259
// insert into ... on duplicate key update ...
174260
Pair<KeyValue, Long> pair = generateNewKv(
175-
tuple,
261+
primaryOldTuple,
176262
param,
177263
partId,
178264
codec,
179265
newTuple,
180266
txnIdByte,
181267
tableIdByte,
182268
partIdByte,
183-
len);
269+
len,
270+
schema,
271+
context,
272+
indexTable);
184273
byte[] extraKey = ByteUtils.encode(
185274
CommonId.CommonType.TXN_CACHE_EXTRA_DATA,
186275
key,
@@ -252,15 +341,18 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
252341
Pair<KeyValue, Long> pair = null;
253342
if (context.isDuplicateKey()) {
254343
pair = generateNewKv(
255-
tuple,
344+
primaryOldTuple,
256345
param,
257346
partId,
258347
codec,
259348
newTuple,
260349
txnIdByte,
261350
tableIdByte,
262351
partIdByte,
263-
len);
352+
len,
353+
schema,
354+
context,
355+
indexTable);
264356
}
265357
// extraKeyValue [12_jobId_tableId_partId_a_none, oldValue]
266358
byte[] extraKey = ByteUtils.encode(
@@ -305,15 +397,18 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
305397
Pair<KeyValue, Long> pair = null;
306398
if (context.isDuplicateKey()) {
307399
pair = generateNewKv(
308-
tuple,
400+
primaryOldTuple,
309401
param,
310402
partId,
311403
codec,
312404
newTuple,
313405
txnIdByte,
314406
tableIdByte,
315407
partIdByte,
316-
len);
408+
len,
409+
schema,
410+
context,
411+
indexTable);
317412
}
318413
byte[] rollBackKey = ByteUtils.getKeyByOp(
319414
CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.DELETE, dataKey
@@ -390,6 +485,9 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
390485
context.setDuplicateKey(true);
391486
oldTuple = codec.decode(oldKv);
392487
}
488+
if (oldTuple == null) {
489+
oldTuple = newTuple;
490+
}
393491
}
394492
long num = 1L;
395493
if (keyValues != null && !keyValues.isEmpty()) {
@@ -402,15 +500,18 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
402500
|| oldKey[oldKey.length - 2] == Op.PUT.getCode()) {
403501
if (param.getUpdateMapping() != null && param.getUpdates() != null) {
404502
pair = generateNewKv(
405-
tuple,
503+
primaryOldTuple,
406504
param,
407505
partId,
408506
codec,
409507
oldTuple,
410508
txnIdByte,
411509
tableIdByte,
412510
partIdByte,
413-
len);
511+
len,
512+
schema,
513+
context,
514+
indexTable);
414515
op = Op.PUT;
415516
} else {
416517
if (param.isIgnore()) {
@@ -463,15 +564,18 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
463564
}
464565
if (context.isDuplicateKey()) {
465566
pair = generateNewKv(
466-
tuple,
567+
primaryOldTuple,
467568
param,
468569
partId,
469570
codec,
470571
oldTuple,
471572
txnIdByte,
472573
tableIdByte,
473574
partIdByte,
474-
len);
575+
len,
576+
schema,
577+
context,
578+
indexTable);
475579
} else {
476580
if (!param.isReplaceInto()) {
477581
keyValue.setKey(
@@ -535,21 +639,32 @@ private static Pair<KeyValue, Long> generateNewKv(Object[] tuple,
535639
byte[] txnIdByte,
536640
byte[] tableIdByte,
537641
byte[] partIdByte,
538-
int len) {
642+
int len,
643+
DingoType schema,
644+
Context context,
645+
Table indexTable) {
539646
KeyValue insertUpKv;
540647
TupleMapping mapping = param.getUpdateMapping();
541648
List<SqlExpr> updates = param.getUpdates();
649+
Object[] revMap = mapping.revMap(tuple);
542650
long updateNum = 0L;
543651
for (int i = 0; i < mapping.size(); i++) {
544-
Object newValue = updates.get(i).eval(tuple);
545-
int index = mapping.get(i);
652+
Object newValue = updates.get(i).eval(revMap);
653+
if (newValue.equals("NULL")) {
654+
newValue = null;
655+
}
656+
int index;
657+
if (indexTable != null) {
658+
index = indexTable.mapping().get(i);
659+
} else {
660+
index = mapping.get(i);
661+
}
546662
if ((newTuple[index] == null && newValue != null)
547663
|| (newTuple[index] != null && !newTuple[index].equals(newValue))) {
548664
newTuple[index] = newValue;
549665
updateNum++;
550666
}
551667
}
552-
DingoType schema = param.getSchema();
553668
Object[] convertTuple = (Object[]) schema.convertFrom(newTuple, ValueConverter.INSTANCE);
554669
KeyValue updateKv = wrap(codec::encode).apply(convertTuple);
555670
CodecService.getDefault().setId(updateKv.getKey(), partId.domain);

0 commit comments

Comments
 (0)