|
50 | 50 | import io.dingodb.meta.entity.IndexTable;
|
51 | 51 | import io.dingodb.meta.entity.IndexType;
|
52 | 52 | import io.dingodb.meta.entity.Table;
|
| 53 | +import io.dingodb.sdk.service.CoordinatorService; |
| 54 | +import io.dingodb.sdk.service.Services; |
| 55 | +import io.dingodb.sdk.service.entity.common.IndexParameter; |
| 56 | +import io.dingodb.sdk.service.entity.common.RawEngine; |
| 57 | +import io.dingodb.sdk.service.entity.common.RegionType; |
| 58 | +import io.dingodb.sdk.service.entity.coordinator.CreateIdsRequest; |
| 59 | +import io.dingodb.sdk.service.entity.coordinator.CreateRegionRequest; |
| 60 | +import io.dingodb.sdk.service.entity.coordinator.IdEpochType; |
53 | 61 | import io.dingodb.sdk.service.entity.meta.ColumnDefinition;
|
54 | 62 | import io.dingodb.sdk.service.entity.meta.DingoCommonId;
|
| 63 | +import io.dingodb.sdk.service.entity.meta.EntityType; |
55 | 64 | import io.dingodb.sdk.service.entity.meta.Partition;
|
56 | 65 | import io.dingodb.sdk.service.entity.meta.TableDefinitionWithId;
|
| 66 | +import io.dingodb.sdk.service.entity.meta.TableIdWithPartIds; |
57 | 67 | import io.dingodb.server.executor.schedule.SafePointUpdateTask;
|
| 68 | +import io.dingodb.store.proxy.Configuration; |
58 | 69 | import io.dingodb.store.proxy.mapper.Mapper;
|
59 | 70 | import io.dingodb.store.proxy.mapper.MapperImpl;
|
60 | 71 | import io.dingodb.store.proxy.service.AutoIncrementService;
|
|
84 | 95 | import static io.dingodb.common.mysql.error.ErrorCode.ErrKeyDoesNotExist;
|
85 | 96 | import static io.dingodb.common.mysql.error.ErrorCode.ErrNoSuchTable;
|
86 | 97 | import static io.dingodb.common.mysql.error.ErrorCode.ErrPartitionMgmtOnNonpartitioned;
|
87 |
| -import static io.dingodb.common.util.NameCaseUtils.caseSensitive; |
88 | 98 | import static io.dingodb.common.util.NameCaseUtils.convertName;
|
89 | 99 | import static io.dingodb.common.util.NameCaseUtils.convertSql;
|
90 | 100 | import static io.dingodb.sdk.service.entity.common.SchemaState.SCHEMA_DELETE_ONLY;
|
|
93 | 103 | import static io.dingodb.sdk.service.entity.common.SchemaState.SCHEMA_PUBLIC;
|
94 | 104 | import static io.dingodb.sdk.service.entity.common.SchemaState.SCHEMA_WRITE_ONLY;
|
95 | 105 | import static io.dingodb.sdk.service.entity.common.SchemaState.SCHEMA_WRITE_REORG;
|
| 106 | +import static io.dingodb.store.proxy.mapper.Mapper.MAPPER; |
96 | 107 |
|
97 | 108 | @Slf4j
|
98 | 109 | public class DdlWorker {
|
@@ -305,6 +316,9 @@ public Pair<Long, String> runDdlJob(DdlContext dc, DdlJob job) {
|
305 | 316 | case ActionTruncateTablePartition:
|
306 | 317 | res = onTruncatePart(dc, job);
|
307 | 318 | break;
|
| 319 | + case ActionAlterIndex: |
| 320 | + res = onAlterIndex(dc, job); |
| 321 | + break; |
308 | 322 | default:
|
309 | 323 | job.setState(JobState.jobStateCancelled);
|
310 | 324 | break;
|
@@ -2182,4 +2196,115 @@ public Pair<Long, String> onTruncatePart(DdlContext dc, DdlJob job) {
|
2182 | 2196 | }
|
2183 | 2197 | return Pair.of(0L, error);
|
2184 | 2198 | }
|
| 2199 | + |
| 2200 | + public Pair<Long, String> onAlterIndex(DdlContext dc, DdlJob job) { |
| 2201 | + String error = job.decodeArgs(); |
| 2202 | + if (error != null) { |
| 2203 | + job.setState(JobState.jobStateCancelled); |
| 2204 | + return Pair.of(0L, error); |
| 2205 | + } |
| 2206 | + Pair<TableDefinitionWithId, String> tableRes = checkTableExistAndCancelNonExistJob(job, job.getSchemaId()); |
| 2207 | + if (tableRes.getValue() != null && tableRes.getKey() == null) { |
| 2208 | + return Pair.of(0L, tableRes.getValue()); |
| 2209 | + } |
| 2210 | + boolean empty = JobTableUtil.validateTableEmpty(job.getSchemaName(), job.getTableName()); |
| 2211 | + if (!empty) { |
| 2212 | + job.setState(JobState.jobStateCancelled); |
| 2213 | + return Pair.of(0L, "table is not empty"); |
| 2214 | + } |
| 2215 | + IndexDefinition indexDef = (IndexDefinition) job.getArgs().get(0); |
| 2216 | + String indexName = convertName(indexDef.getName()); |
| 2217 | + List<Object> indexList = InfoSchemaService.root().allIndex(job.getSchemaId(), job.getTableId()); |
| 2218 | + TableDefinitionWithId indexWithId = indexList.stream() |
| 2219 | + .map(idxTable -> (TableDefinitionWithId)idxTable) |
| 2220 | + .filter(idxTable -> |
| 2221 | + idxTable.getTableDefinition().getName().toUpperCase().endsWith(indexName.toUpperCase())) |
| 2222 | + .findFirst() |
| 2223 | + .orElse(null); |
| 2224 | + if (indexWithId == null) { |
| 2225 | + job.setState(JobState.jobStateCancelled); |
| 2226 | + job.setDingoErr(DingoErrUtil.newInternalErr(ErrKeyDoesNotExist, indexName, job.getTableName())); |
| 2227 | + return Pair.of(0L, job.getDingoErr().errorMsg); |
| 2228 | + } |
| 2229 | + indexDef.getProperties().forEach((k, v) -> { |
| 2230 | + indexWithId.getTableDefinition().getProperties().put(k.toString(), v.toString()); |
| 2231 | + }); |
| 2232 | + |
| 2233 | + MAPPER.resetIndexParameter(indexWithId.getTableDefinition(), indexDef); |
| 2234 | + DingoCommonId originIndexId = indexWithId.getTableId(); |
| 2235 | + |
| 2236 | + CoordinatorService coordinatorService = Services.coordinatorService(Configuration.coordinatorSet()); |
| 2237 | + List<DingoCommonId> indexIds = coordinatorService.createIds( |
| 2238 | + TsoService.getDefault().cacheTso(), |
| 2239 | + CreateIdsRequest.builder().idEpochType(IdEpochType.ID_NEXT_TABLE).count(1).build() |
| 2240 | + ).getIds() |
| 2241 | + .stream() |
| 2242 | + .map(id -> DingoCommonId.builder() |
| 2243 | + .entityId(id) |
| 2244 | + .entityType(EntityType.ENTITY_TYPE_INDEX) |
| 2245 | + .parentEntityId(job.getTableId()) |
| 2246 | + .build()) |
| 2247 | + .collect(Collectors.toList()); |
| 2248 | + |
| 2249 | + List<DingoCommonId> indexPartIds = coordinatorService.createIds( |
| 2250 | + TsoService.getDefault().cacheTso(), CreateIdsRequest.builder() |
| 2251 | + .idEpochType(IdEpochType.ID_NEXT_TABLE) |
| 2252 | + .count(indexWithId.getTableDefinition() |
| 2253 | + .getTablePartition().getPartitions().size()) |
| 2254 | + .build() |
| 2255 | + ).getIds().stream() |
| 2256 | + .map(id -> DingoCommonId.builder() |
| 2257 | + .entityType(EntityType.ENTITY_TYPE_PART) |
| 2258 | + .parentEntityId(indexIds.get(0).getEntityId()) |
| 2259 | + .entityId(id) |
| 2260 | + .build()) |
| 2261 | + .collect(Collectors.toList()); |
| 2262 | + TableIdWithPartIds indexIdWithPartIds = TableIdWithPartIds.builder() |
| 2263 | + .tableId(indexIds.get(0)) |
| 2264 | + .partIds(indexPartIds) |
| 2265 | + .build(); |
| 2266 | + io.dingodb.store.proxy.meta.MetaService.resetTableId(indexIdWithPartIds, indexWithId); |
| 2267 | + |
| 2268 | + io.dingodb.sdk.service.entity.meta.TableDefinition definition = indexWithId.getTableDefinition(); |
| 2269 | + for (Partition partition : definition.getTablePartition().getPartitions()) { |
| 2270 | + IndexParameter indexParameter = definition.getIndexParameter(); |
| 2271 | + if (indexParameter.getVectorIndexParameter() != null) { |
| 2272 | + indexParameter.setIndexType(io.dingodb.sdk.service.entity.common.IndexType.INDEX_TYPE_VECTOR); |
| 2273 | + } else if (indexParameter.getDocumentIndexParameter() != null) { |
| 2274 | + indexParameter.setIndexType(io.dingodb.sdk.service.entity.common.IndexType.INDEX_TYPE_DOCUMENT); |
| 2275 | + } |
| 2276 | + CreateRegionRequest request = CreateRegionRequest.builder() |
| 2277 | + .regionName("I_" + job.getSchemaId() + "_" + definition.getName() + "_part_" |
| 2278 | + + partition.getId().getEntityId()) |
| 2279 | + .regionType(definition.getIndexParameter().getIndexType() == io.dingodb.sdk.service.entity.common.IndexType.INDEX_TYPE_SCALAR |
| 2280 | + ? RegionType.STORE_REGION : RegionType.INDEX_REGION) |
| 2281 | + .replicaNum(indexWithId.getTableDefinition().getReplica()) |
| 2282 | + .range(partition.getRange()) |
| 2283 | + .rawEngine(RawEngine.RAW_ENG_ROCKSDB) |
| 2284 | + .storeEngine(definition.getStoreEngine()) |
| 2285 | + .schemaId(job.getSchemaId()) |
| 2286 | + .tableId(job.getTableId()) |
| 2287 | + .partId(partition.getId().getEntityId()) |
| 2288 | + .tenantId(indexWithId.getTenantId()) |
| 2289 | + .indexId(indexWithId.getTableId().getEntityId()) |
| 2290 | + .indexParameter(indexParameter) |
| 2291 | + .build(); |
| 2292 | + try { |
| 2293 | + LogUtils.info(log, "create region, range:{}", partition.getRange()); |
| 2294 | + coordinatorService.createRegion(TsoService.getDefault().cacheTso(), request); |
| 2295 | + } catch (Exception e) { |
| 2296 | + LogUtils.error(log, "create region error,schemaId:{},regionId:{}", |
| 2297 | + job.getSchemaId(), partition.getRange(), e); |
| 2298 | + job.setState(JobState.jobStateCancelled); |
| 2299 | + return Pair.of(0L, "create index region error"); |
| 2300 | + } |
| 2301 | + } |
| 2302 | + MetaService.root().dropIndex( |
| 2303 | + MAPPER.idFrom(tableRes.getKey().getTableId()), |
| 2304 | + MAPPER.idFrom(originIndexId), |
| 2305 | + job.getId(), job.getSnapshotVer() |
| 2306 | + ); |
| 2307 | + job.finishTableJob(JobState.jobStateDone, SchemaState.SCHEMA_PUBLIC); |
| 2308 | + return TableUtil.updateVersionAndIndexInfos(dc, job, indexWithId, true); |
| 2309 | + } |
2185 | 2310 | }
|
0 commit comments