Skip to content

Commit 71b5b8d

Browse files
authored
Merge pull request #105 from aws-samples/94-update-cqlreplicatorscala-for-memorydb-parquet-and-opensearch
94 update cqlreplicatorscala for memorydb parquet and opensearch
2 parents d3c02d9 + 5988502 commit 71b5b8d

File tree

6 files changed

+466
-243
lines changed

6 files changed

+466
-243
lines changed

glue/README.MD

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ This migration approach ensures zero downtime, no code compilation, predictable
55
## Architecture
66
This solution offers customers the flexibility to scale the migration workload up and out by deploying multiple Glue
77
jobs of CQLReplicator.
8-
Each glue job (tile) is tasked with handling a specific subset of primary keys (250K per tile) to distribute migration
9-
workload evenly across Glue data processing units.
8+
Each glue job (tile) is tasked with handling a specific subset of primary keys (250K per one data processing unit
9+
G.025X) to distribute migration
10+
workload evenly across data processing units.
1011
A single replication glue job (AWS Glue DPU - 0.25X) can push up to 1500 WCUs per second against the target table in
1112
Amazon Keyspaces. This allows for easy estimation of the final traffic against Amazon Keyspaces.
1213

@@ -188,10 +189,22 @@ cqlreplicator --state stats --landing-zone s3://cql-replicator-1234567890-us-wes
188189
+------------------------------------------------------------------------+
189190
| 1 | 2 | 2 | 1 | "2024-02-02T21:50:25.454" |
190191
+------------------------------------------------------------------------+
191-
[2024-02-02T16:50:39-05:00] Discovered rows in casereview.casedetails is 70270
192-
[2024-02-02T16:50:39-05:00] Replicated rows in casereview.casedetails_2 is 70270
192+
[2024-02-02T16:50:39-05:00] Discovered rows in ks_test_cql_replicator.test_cql_replicator is 70270
193+
[2024-02-02T16:50:39-05:00] Replicated rows in ks_test_cql_replicator.test_cql_replicator is 70270
193194
```
194195
196+
## AWS Glue Monitoring
197+
198+
You can profile and monitor CQLReplicator operations using AWS Glue job profiler. It collects and processes raw data
199+
from AWS Glue jobs into readable, near real-time metrics stored in Amazon CloudWatch.
200+
These statistics are retained and aggregated in CloudWatch so that you can access historical information for a better
201+
perspective on how your application is performing.
202+
203+
In order tp enable the CQLReplicator enhanced monitoring use the `--enhanced-monitoring-enabled` flag with your init
204+
command.
205+
206+
***You may incur additional charges when you enable job metrics and CloudWatch custom metrics are created***
207+
195208
## Cost optimization
196209
197210
In order to reduce AWS Glue costs after the historical workload moved to the target storage:
@@ -262,7 +275,7 @@ After the init phase is completed you can run the following command to start the
262275
263276
## Clean up
264277
265-
The following command will delete the Glue job, connector, the S3 bucket, and Keyspaces'table ledger:
278+
The following command will delete the Glue job, connector, the S3 bucket, and the ledger table:
266279
267280
```shell
268281
cqlreplicator --state cleanup --landing-zone s3://cql-replicator-1234567890-us-west-2 --region "us-west-2"

glue/bin/cqlreplicator

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ GLUE_IAM_ROLE=""
3535
AWS_ACCOUNT=""
3636
KEYS_PER_TILE=0
3737
ROWS_PER_WORKER=250000
38-
# Target type might be keyspace or parquet
3938
TARGET_TYPE=keyspaces
4039
SKIP_GLUE_CONNECTOR=false
4140
SKIP_KEYSPACES_LEDGER=false
4241
JSON_MAPPING=""
4342
REPLICATION_POINT_IN_TIME=0
4443
REPLICATION_STATS_ENABLED=false
44+
GLUE_MONITORING=false
45+
SAFE_MODE=true
4546
OS=$(uname -a | awk '{print $1}')
4647

4748
# Progress bar configuration
@@ -90,6 +91,7 @@ log() {
9091

9192
if [[ "$OS" == Linux || "$OS" == Darwin ]]; then
9293
log "OS: $OS"
94+
log "AWS CLI: $(aws --version)"
9395
else
9496
log "ERROR: Please run this script in AWS CloudShell or Linux/Darwin"
9597
exit 1
@@ -193,10 +195,10 @@ function uploader_helper() {
193195
local next_pos=$3
194196
local final_pos=$4
195197
check_file_exists "$path_to_conf/$artifact_name"
196-
progress $curr_pos $final_pos "Uploading $artifact_name "
198+
progress $curr_pos $final_pos "Uploading $artifact_name "
197199
if ls "$path_to_conf/$artifact_name" > /dev/null
198200
then
199-
progress $next_pos $final_pos "Uploading $artifact_name "
201+
progress $next_pos $final_pos "Uploading $artifact_name "
200202
aws s3 cp "$path_to_conf"/"$artifact_name" "$S3_LANDING_ZONE"/artifacts/"$artifact_name" > /dev/null
201203
else
202204
log "ERROR: $path_to_conf/$artifact_name not found"
@@ -228,14 +230,14 @@ function barrier() {
228230
}
229231

230232
function Usage_Exit {
231-
log "$0 [--state init/run/request-stop|--tiles number of tiles|--landing-zone s3Uri|--writetime-column col3|\
233+
echo "$0 [--state init/run/request-stop|--tiles number of tiles|--landing-zone s3Uri|--writetime-column col3|\
232234
--src-keyspace keyspace_name|--src-table table_name|--trg-keyspace keyspace_name|--trg-table table_name]"
233-
log "Script version:" ${MIGRATOR_VERSION}
234-
log "init - Deploy CQLReplicator Glue job, and download jars"
235-
log "run - Start migration process"
236-
log "stats - Upload progress. Only for historical workload"
237-
log "request-stop - Stop migration process"
238-
log "cleanup - Delete all CQLReplicator artifacts"
235+
echo "Script version:" ${MIGRATOR_VERSION}
236+
echo "init - Deploy CQLReplicator Glue job, and download jars"
237+
echo "run - Start migration process"
238+
echo "stats - Upload progress. Only for historical workload"
239+
echo "request-stop - Stop migration process"
240+
echo "cleanup - Delete all the CQLReplicator's artifacts"
239241
exit 1
240242
}
241243

@@ -246,9 +248,9 @@ function Clean_Up {
246248
aws s3 rb "$S3_LANDING_ZONE"
247249
local connection_name
248250
connection_name=$(aws glue get-job --job-name CQLReplicator --query 'Job.Connections.Connections[0]' --output text)
249-
aws glue delete-connection --connection-name "$connection_name" --region "$AWS_REGION"
250-
aws glue delete-connection --connection-name cql-replicator-memorydb-integration --region "$AWS_REGION" > /dev/null
251-
aws glue delete-connection --connection-name cql-replicator-opensearch-integration --region "$AWS_REGION" > /dev/null
251+
aws glue delete-connection --connection-name "$connection_name" --region "$AWS_REGION" > /dev/null 2>&1
252+
aws glue delete-connection --connection-name cql-replicator-memorydb-integration --region "$AWS_REGION" > /dev/null 2>&1
253+
aws glue delete-connection --connection-name cql-replicator-opensearch-integration --region "$AWS_REGION" > /dev/null 2>&1
252254
aws glue delete-job --job-name CQLReplicator --region "$AWS_REGION"
253255
if [[ $SKIP_KEYSPACES_LEDGER == false ]]; then
254256
aws keyspaces delete-keyspace --keyspace-name migration --region "$AWS_REGION"
@@ -335,8 +337,13 @@ function Init {
335337

336338
# Create Glue Connector
337339
local glue_conn_name
340+
local enhanced_monitoring=""
341+
if [[ "$GLUE_MONITORING" == true ]]; then
342+
enhanced_monitoring=',"--enable-continuous-cloudwatch-log":"true","--enable-continuous-log-filter":"true","--enable-metrics":"true","--enable-observability-metrics":"true"'
343+
fi
344+
338345
if [[ $SKIP_GLUE_CONNECTOR == false ]]; then
339-
progress 3 5 "Creating Glue connector and CQLReplicator job "
346+
progress 3 5 "Creating Glue artifacts "
340347
glue_conn_name=$(echo cql-replicator-"$(uuidgen)" | tr ' [:upper:]' ' [:lower:]')
341348
aws glue create-connection --connection-input '{
342349
"Name":"'$glue_conn_name'",
@@ -390,11 +397,12 @@ function Init {
390397
"--extra-jars":"'$S3_LANDING_ZONE'/artifacts/jedis-4.4.6.jar,'$S3_LANDING_ZONE'/artifacts/spark-cassandra-connector-assembly_2.12-3.4.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-retry-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-core-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/vavr-0.10.4.jar,'$S3_LANDING_ZONE'/artifacts/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9.jar,'$S3_LANDING_ZONE'/artifacts/opensearch-spark-30_2.12-1.0.1.jar",
391398
"--conf":"spark.files='$S3_LANDING_ZONE'/artifacts/KeyspacesConnector.conf,'$S3_LANDING_ZONE'/artifacts/CassandraConnector.conf --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.kryoserializer.buffer.max=128m --conf spark.rdd.compress=true --conf spark.cleaner.periodicGC.interval=1min --conf spark.kryo.referenceTracking=false --conf spark.cleaner.referenceTracking.cleanCheckpoints=true --conf spark.task.maxFailures=64",
392399
"--class":"GlueApp"
400+
'$enhanced_monitoring'
393401
}' > /dev/null
394402
fi
395403

396404
if [[ $SKIP_GLUE_CONNECTOR == true ]]; then
397-
progress 3 5 "Creating CQLReplicator job "
405+
progress 3 5 "Creating Glue artifacts "
398406
aws glue create-job \
399407
--name "CQLReplicator" \
400408
--role "$GLUE_IAM_ROLE" \
@@ -411,16 +419,17 @@ function Init {
411419
"--extra-jars":"'$S3_LANDING_ZONE'/artifacts/jedis-4.4.6.jar,'$S3_LANDING_ZONE'/artifacts/spark-cassandra-connector-assembly_2.12-3.4.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-retry-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-core-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/vavr-0.10.4.jar,'$S3_LANDING_ZONE'/artifacts/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9.jar,'$S3_LANDING_ZONE'/artifacts/opensearch-spark-30_2.12-1.0.1.jar",
412420
"--conf":"spark.files='$S3_LANDING_ZONE'/artifacts/KeyspacesConnector.conf,'$S3_LANDING_ZONE'/artifacts/CassandraConnector.conf --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.kryoserializer.buffer.max=128m --conf spark.rdd.compress=true --conf spark.cleaner.periodicGC.interval=1min --conf spark.kryo.referenceTracking=false --conf spark.cleaner.referenceTracking.cleanCheckpoints=true --conf spark.task.maxFailures=64",
413421
"--class":"GlueApp"
422+
'$enhanced_monitoring'
414423
}' > /dev/null
415424
fi
416425

417426
if [[ $SKIP_KEYSPACES_LEDGER == true ]]; then
418-
progress 4 5 "Skipping CQLReplicator's internal keyspace "
419-
progress 5 5 "Skipping CQLReplicator's internal table "
427+
progress 4 5 "Skipping CQLReplicator's internal keyspace "
428+
progress 5 5 "Skipping CQLReplicator's internal table "
420429
fi
421430

422431
if [[ $SKIP_KEYSPACES_LEDGER == false ]]; then
423-
progress 4 5 "Creating CQLReplicator's internal resources "
432+
progress 4 5 "Creating CQLReplicator's internal resources "
424433
# Create a keyspace - migration
425434
aws keyspaces create-keyspace --keyspace-name migration --region "$AWS_REGION" > /dev/null
426435
sleep 20
@@ -438,7 +447,7 @@ function Init {
438447
{ "name": "offload_status", "type": "text" } ],
439448
"partitionKeys": [ { "name": "ks" }, { "name": "tbl" } ],
440449
"clusteringKeys": [ { "name": "tile", "orderBy": "ASC" }, { "name": "ver", "orderBy": "ASC" } ] }' --region "$AWS_REGION" > /dev/null
441-
progress 5 5 "Creating CQLReplicator's internal resources "
450+
progress 5 5 "Created the CQLReplicator internal resources "
442451
fi
443452

444453
log "Deploy is completed"
@@ -463,6 +472,7 @@ function Start_Discovery {
463472
log "TTL COLUMN:" $TTL_COLUMN
464473
log "ROWS PER DPU:" $ROWS_PER_WORKER
465474
log "START REPLICATING FROM: $REPLICATION_POINT_IN_TIME (0 is disabled)"
475+
log "SAFE MODE: $SAFE_MODE"
466476
local workers=$((1 + TILES / 2))
467477
log "Checking if the discovery job is already running..."
468478
check_discovery_runs "true"
@@ -478,6 +488,7 @@ function Start_Discovery {
478488
"--TARGET_KS":"'$TARGET_KS'",
479489
"--TARGET_TBL":"'$TARGET_TBL'",
480490
"--WRITETIME_COLUMN":"'$WRITETIME_COLUMN'",
491+
"--SAFE_MODE":"'$SAFE_MODE'",
481492
"--OFFLOAD_LARGE_OBJECTS":"'$OFFLOAD_LARGE_OBJECTS_B64'",
482493
"--REPLICATION_POINT_IN_TIME":"'$REPLICATION_POINT_IN_TIME'",
483494
"--TTL_COLUMN":"'$TTL_COLUMN'"}' --output text)
@@ -504,6 +515,7 @@ function Start_Replication {
504515
"--TARGET_KS":"'$TARGET_KS'",
505516
"--TARGET_TBL":"'$TARGET_TBL'",
506517
"--WRITETIME_COLUMN":"'$WRITETIME_COLUMN'",
518+
"--SAFE_MODE":"'$SAFE_MODE'",
507519
"--OFFLOAD_LARGE_OBJECTS":"'$OFFLOAD_LARGE_OBJECTS_B64'",
508520
"--REPLICATION_POINT_IN_TIME":"'$REPLICATION_POINT_IN_TIME'",
509521
"--TTL_COLUMN":"'$TTL_COLUMN'"}' --output text)
@@ -705,6 +717,14 @@ while (( "$#" )); do
705717
REPLICATION_STATS_ENABLED=true
706718
shift 1
707719
;;
720+
--enhanced-monitoring-enabled)
721+
GLUE_MONITORING=true
722+
shift 1
723+
;;
724+
--safe-mode-disabled)
725+
SAFE_MODE=false
726+
shift 1
727+
;;
708728
--)
709729
shift
710730
break

0 commit comments

Comments
 (0)