Skip to content

Commit 7fa7f56

Browse files
authored
Merge pull request #53 from aws-samples/38-cqlreplicator-on-glue-add-preflight-check
fixed minor issues, added AWS region as a parameter, added the pre-fl…
2 parents ff30239 + f6e7512 commit 7fa7f56

File tree

3 files changed

+115
-53
lines changed

3 files changed

+115
-53
lines changed

glue/README.MD

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ if you choose to replicate TTLs, updates, or offloading objects exceeding 1MB to
6868
Let's run the following command to replicate the workload from the Cassandra cluster to Amazon Keyspaces.
6969

7070
```shell
71-
cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 \
71+
cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 --region us-west-2 \
7272
--src-keyspace ks_test_cql_replicator --src-table test_cql_replicator \
7373
--trg-keyspace ks_test_cql_replicator --trg-table test_cql_replicator --inc-traffic
7474
```
@@ -84,34 +84,34 @@ every 2 minutes. At peak traffic, it can reach up to 22,400 WCUs per second.
8484

8585
### Replicate near-real time updates and inserts
8686
```shell
87-
cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 --writetime-column col3 \
88-
--src-keyspace ks_test_cql_replicator --src-table test_cql_replicator \
87+
cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 --region us-west-2 \
88+
--writetime-column col3 --src-keyspace ks_test_cql_replicator --src-table test_cql_replicator \
8989
--trg-keyspace ks_test_cql_replicator --trg-table test_cql_replicator --inc-traffic
9090
```
9191

9292
### Replicate with TTL
9393
the TTL feature should be [enabled](https://docs.aws.amazon.com/keyspaces/latest/devguide/TTL-how-it-works.html#ttl-howitworks_enabling)
9494
on the target table before running the following command
9595
```shell
96-
cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 --writetime-column col3 \
97-
--src-keyspace ks_test_cql_replicator --src-table test_cql_replicator --ttl-column col3 \
96+
cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 --region us-west-2 \
97+
--writetime-column col3 --src-keyspace ks_test_cql_replicator --src-table test_cql_replicator --ttl-column col3 \
9898
--trg-keyspace ks_test_cql_replicator --trg-table test_cql_replicator --inc-traffic
9999
```
100100

101101
### Offload large objects
102102
Before running the migration process configure [lifecycle](https://docs.aws.amazon.com/AmazonS3/latest/userguide/intro-lifecycle-rules.html)
103103
for the S3 bucket to delete objects after expiration.
104104
```shell
105-
./cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 --src-keyspace ks_test_cql_replicator
106-
--src-table test_cql_replicator --trg-keyspace ks_test_cql_replicator --trg-table test_cql_replicator \
107-
--ttl-column col3 --offload-large-objects '{"column":"col1","bucket":"my-application-resource",
105+
./cqlreplicator --state run --tiles 8 --landing-zone s3://cql-replicator-1234567890-us-west-2 --src-keyspace ks_test_cql_replicator \
106+
--src-table test_cql_replicator --trg-keyspace ks_test_cql_replicator --trg-table test_cql_replicator \
107+
--region us-west-2 --ttl-column col3 --offload-large-objects '{"column":"col1","bucket":"my-application-resource",
108108
"prefix":"ks_test_cql_replicator/test_cql_replicator/col1","xref":"link"}'
109109
```
110110

111111
## Stop migration process
112112
To stop migration process gracefully run the following command:
113113
```shell
114-
cqlreplicator --state request-stop --landing-zone s3://cql-replicator-1234567890-us-west-2 \
114+
cqlreplicator --state request-stop --landing-zone s3://cql-replicator-1234567890-us-west-2 --region us-west-2 \
115115
--src-keyspace ks_test_cql_replicator --src-table test_cql_replicator
116116
```
117117

@@ -120,7 +120,7 @@ Customers can simply restart the migration process from the point where it was i
120120
In order, to restart failed CQLReplicator jobs, you need to re-run `--state run` with the same parameters.
121121

122122
## Get migration stats
123-
To get replicated row stats, run the following command:
123+
To obtain the number of rows replicated during the back filling phase, run the following command:
124124
```shell
125125
cqlreplicator --state stats --landing-zone s3://cql-replicator-1234567890-us-west-2 \
126126
--src-keyspace ks_test_cql_replicator --src-table test_cql_replicator

glue/bin/cqlreplicator

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# Migration parameters
77
MIGRATOR_VERSION=0.2
88
JOB_NAME=CQLReplicator
9-
TILES=1
9+
TILES=2
1010
PROCESS_TYPE_DISCOVERY=discovery
1111
PROCESS_TYPE_REPLICATION=replication
1212
SOURCE_KS=ks_test_cql_replicator
@@ -23,7 +23,7 @@ DISCOVERED_TOTAL=0
2323
REPLICATED_TOTAL=0
2424
OFFLOAD_LARGE_OBJECTS_B64=$(echo "None" | base64)
2525
BASE_FOLDER=$(pwd -L)
26-
AWS_REGION="us-east-1"
26+
AWS_REGION=""
2727
SUBNET=""
2828
SG=""
2929
AZ=""
@@ -81,13 +81,17 @@ function check_input() {
8181
return 0
8282
}
8383

84-
8584
function check_discovery_runs() {
8685
local rs
87-
rs=$(aws glue get-job-runs --job-name CQLReplicator --query 'JobRuns[?JobRunState==`RUNNING`] | [].Arguments | [?"--PROCESS_TYPE"==`discovery`]' | jq '.[0]["--SOURCE_TBL"] == "'"$SOURCE_TBL"'" and .[0]["--SOURCE_KS"] == "'"$SOURCE_KS"'"')
88-
if [[ $rs = "true" ]]; then
89-
#log "ERROR: Discovery job is already running for" $SOURCE_KS.$SOURCE_TBL
90-
return 1
86+
local mode
87+
# mode = true, if discovery job is not running return 0
88+
# mode = false, if discovery job is not running return 1
89+
mode=$1
90+
rs=$(aws glue get-job-runs --job-name CQLReplicator --region "$AWS_REGION" --query 'JobRuns[?JobRunState==`RUNNING`] | [].Arguments | [?"--PROCESS_TYPE"==`discovery`]' | jq '.[0]["--SOURCE_TBL"] == "'"$SOURCE_TBL"'" and .[0]["--SOURCE_KS"] == "'"$SOURCE_KS"'"')
91+
92+
if [[ $rs == "$mode" ]]; then
93+
log "ERROR: The discovery job has failed, check AWS Glue logs"
94+
exit 1
9195
fi
9296
return 0
9397
}
@@ -96,15 +100,23 @@ function check_replication_runs() {
96100
local tile
97101
local rs
98102
tile=$1
99-
rs=$(aws glue get-job-runs --job-name CQLReplicator --query 'JobRuns[?JobRunState==`RUNNING`] | [].Arguments | [?"--PROCESS_TYPE"==`replication`]' | jq '.[0]["--SOURCE_TBL"] == "'"$SOURCE_TBL"'" and .[0]["--SOURCE_KS"] == "'"$SOURCE_KS"'" and .[]["--TILE"] == "'"$tile"'"' | grep true)
103+
rs=$(aws glue get-job-runs --job-name CQLReplicator --region "$AWS_REGION" --query 'JobRuns[?JobRunState==`RUNNING`] | [].Arguments | [?"--PROCESS_TYPE"==`replication`]' | jq '.[0]["--SOURCE_TBL"] == "'"$SOURCE_TBL"'" and .[0]["--SOURCE_KS"] == "'"$SOURCE_KS"'" and .[]["--TILE"] == "'"$tile"'"' | grep true)
100104

101-
if [[ $rs = "true" ]]; then
105+
if [[ $rs == "true" ]]; then
102106
#log "ERROR: Replication job is already running per tile $tile for" $SOURCE_KS.$SOURCE_TBL
103107
return 1
104108
fi
105109
return 0
106110
}
107111

112+
function check_num_tiles() {
113+
if [[ $TILES -lt 2 ]]; then
114+
log "Total number of tiles should be => 2"
115+
exit 1
116+
fi
117+
return 0
118+
}
119+
108120
function progress {
109121
local current="$1"
110122
local total="$2"
@@ -124,6 +136,7 @@ function progress {
124136
}
125137

126138
function barrier() {
139+
flag_check_discovery_run="$1"
127140
while true
128141
do
129142
cnt=0
@@ -137,6 +150,11 @@ function barrier() {
137150
if [[ $cnt == "$TILES" ]]; then
138151
break
139152
fi
153+
if [[ $flag_check_discovery_run == "true" ]]; then
154+
# if the discovery job is not running then fail (return 1)
155+
sleep 2
156+
check_discovery_runs "false"
157+
fi
140158
done
141159
}
142160

@@ -152,6 +170,7 @@ function Usage_Exit {
152170
log "run - Start migration process"
153171
log "stats - Upload progress. Only for historical workload"
154172
log "request-stop - Stop migration process"
173+
log "cleanup - Delete all CQLReplicator artifacts"
155174
exit 1
156175
}
157176

@@ -161,9 +180,9 @@ function Clean_Up {
161180
aws s3 rb "$S3_LANDING_ZONE"
162181
local connection_name
163182
connection_name=$(aws glue get-job --job-name CQLReplicator --query 'Job.Connections.Connections[0]' --output text)
164-
aws glue delete-connection --connection-name "$connection_name"
165-
aws glue delete-job --job-name CQLReplicator
166-
aws keyspaces delete-keyspace --keyspace-name migration
183+
aws glue delete-connection --connection-name "$connection_name" --region "$AWS_REGION"
184+
aws glue delete-job --job-name CQLReplicator --region "$AWS_REGION"
185+
aws keyspaces delete-keyspace --keyspace-name migration --region "$AWS_REGION"
167186
}
168187

169188
function Init {
@@ -262,22 +281,6 @@ function Init {
262281
"AvailabilityZone":"'$AZ'"}
263282
}' --region "$AWS_REGION" --endpoint https://glue."$AWS_REGION".amazonaws.com --output json
264283

265-
#glue_conn_name=$(echo cql-replicator-"$(uuidgen)" | tr ' [:upper:]' ' [:lower:]')
266-
# aws glue create-connection --connection-input '{
267-
# "Name":"'$glue_conn_name'",
268-
# "Description":"CQLReplicator connection to the C* cluster",
269-
# "ConnectionType":"CUSTOM",
270-
# "ConnectionProperties":{
271-
# "CONNECTOR_TYPE": "Spark",
272-
# "CONNECTOR_URL": "'$S3_LANDING_ZONE'/artifacts/spark-cassandra-connector-assembly_2.12-3.4.0.jar",
273-
# "CONNECTOR_CLASS_NAME": "org.apache.spark.sql.cassandra"
274-
# },
275-
# "PhysicalConnectionRequirements":{
276-
# "SubnetId":"'$SUBNET'",
277-
# "SecurityGroupIdList":["'$SG'"],
278-
# "AvailabilityZone":"'$AZ'"}
279-
# }' --region "$AWS_REGION" --endpoint https://glue."$AWS_REGION".amazonaws.com --output json
280-
281284
# Create Glue Jobs
282285
aws glue create-job \
283286
--name "CQLReplicator" \
@@ -375,22 +378,21 @@ function Start_Discovery {
375378
check_input "$TARGET_KS" "ERROR: target keyspace name is empty, must be provided"
376379
check_input "$TARGET_TBL" "ERROR: target table name is empty, must be provided"
377380
check_input "$S3_LANDING_ZONE" "ERROR: landing zone must be provided"
381+
check_num_tiles
378382

379383
log "TILES:" "$TILES"
380384
log "SOURCE:" "$SOURCE_KS"."$SOURCE_TBL"
381385
log "TARGET:" "$TARGET_KS"."$TARGET_TBL"
382386
log "LANDING ZONE:" "$S3_LANDING_ZONE"
383-
log "Writetime column:" $WRITETIME_COLUMN
384-
log "TTL column:" $TTL_COLUMN
387+
log "WRITE TIME COLUMN:" $WRITETIME_COLUMN
388+
log "TTL COLUMN:" $TTL_COLUMN
385389
local workers=$((1 + TILES / 2))
386390
log "Checking if the discovery job is already running..."
387-
check_discovery_runs
391+
check_discovery_runs "true"
388392
if [ $? = 0 ]; then
389393
Delete_Stop_Event_D
390394
log "Starting the discovery job..."
391-
#KEYS_PER_TILE=$(aws s3 cp "$S3_LANDING_ZONE"/"$SOURCE_KS"/"$SOURCE_TBL"/stats/discovery/0/count.json - | head | jq '.primaryKeys')
392-
#log "Average primary keys per tile is $KEYS_PER_TILE"
393-
rs=$(aws glue start-job-run --job-name "$JOB_NAME" --worker-type G.1X --number-of-workers "$workers" --arguments '{"--PROCESS_TYPE":"'$PROCESS_TYPE_DISCOVERY'",
395+
rs=$(aws glue start-job-run --job-name "$JOB_NAME" --worker-type G.1X --number-of-workers "$workers" --region "$AWS_REGION" --arguments '{"--PROCESS_TYPE":"'$PROCESS_TYPE_DISCOVERY'",
394396
"--TILE":"0",
395397
"--TOTAL_TILES":"'$TILES'",
396398
"--S3_LANDING_ZONE":"'$S3_LANDING_ZONE'",
@@ -415,7 +417,7 @@ function Start_Replication {
415417
check_replication_runs $cnt
416418
if [ $? = 0 ]; then
417419
Delete_Stop_Event_R $cnt
418-
rs=$(aws glue start-job-run --job-name "$JOB_NAME" --worker-type G.025X --number-of-workers "$workers" --arguments '{"--PROCESS_TYPE":"'$PROCESS_TYPE_REPLICATION'",
420+
rs=$(aws glue start-job-run --job-name "$JOB_NAME" --worker-type G.025X --number-of-workers "$workers" --region "$AWS_REGION" --arguments '{"--PROCESS_TYPE":"'$PROCESS_TYPE_REPLICATION'",
419421
"--TILE":"'$cnt'",
420422
"--TOTAL_TILES":"'$TILES'",
421423
"--S3_LANDING_ZONE":"'$S3_LANDING_ZONE'",
@@ -438,7 +440,7 @@ function validate_json() {
438440
local json_str=$1
439441

440442
# Check if the JSON is valid
441-
echo "$json_str" | jq empty #> /dev/null 2>&1
443+
echo "$json_str" | jq empty
442444
if [[ $? -ne 0 ]]; then
443445
log "ERROR: Invalid JSON"
444446
log '{"column": "column_name", "bucket": "bucket-name", "prefix": "keyspace_name/table_name/payload", "xref": "reference-column"}'
@@ -595,7 +597,7 @@ eval set -- "$PARAMS"
595597

596598
if [[ $STATE == run ]]; then
597599
Start_Discovery
598-
barrier
600+
barrier "true"
599601
Start_Replication
600602
log "Started jobs:" "${JOBS[@]}"
601603
fi
@@ -632,7 +634,8 @@ if [[ $STATE == cleanup ]]; then
632634
fi
633635

634636
if [[ $STATE == stats ]]; then
635-
barrier
637+
# the barrier without checking if the discovery job is running
638+
barrier "false"
636639
tile=0
637640
while [ $tile -lt "$TILES" ]
638641
do

glue/sbin/CQLReplicator.scala

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import org.apache.spark.sql.expressions.Window
2121
import org.apache.spark.sql.functions
2222
import org.apache.spark.sql.functions._
2323
import org.apache.spark.sql.DataFrame
24+
2425
import java.time.Duration
26+
import java.util.Optional
2527

2628
import org.apache.spark.sql.catalyst.expressions.Expression
2729
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -58,11 +60,16 @@ class CassandraTypeException(s: String) extends RuntimeException {}
5860

5961
object GlueApp {
6062
def main(sysArgs: Array[String]) {
63+
6164
def shuffleDf(df: DataFrame): DataFrame = {
6265
val encoder = RowEncoder(df.schema)
6366
df.mapPartitions(new scala.util.Random().shuffle(_))(encoder)
6467
}
6568

69+
def shuffleDfV2(df: DataFrame): DataFrame = {
70+
df.orderBy(rand())
71+
}
72+
6673
def customConnectionFactory(sc: SparkContext): (CassandraConnector, CassandraConnector) = {
6774
val connectorToClusterSrc = CassandraConnector(sc.getConf.set("spark.cassandra.connection.config.profile.path", "KeyspacesConnector.conf"))
6875
val connectorToClusterTrg = CassandraConnector(sc.getConf.set("spark.cassandra.connection.config.profile.path", "CassandraConnector.conf"))
@@ -97,6 +104,50 @@ object GlueApp {
97104
}
98105
}
99106

107+
def preFlightCheck(connection: CassandraConnector, keyspace: String, table: String, dir: String): Unit = {
108+
val logger = new GlueLogger
109+
Try {
110+
val c1 = Option(connection.openSession)
111+
c1.isEmpty match {
112+
case false => {
113+
c1.get.getMetadata.getKeyspace(keyspace).isPresent match {
114+
case true => {
115+
c1.get.getMetadata.getKeyspace(keyspace).get.getTable(table).isPresent match {
116+
case true => {
117+
logger.info(s"the $dir table $table exists")
118+
}
119+
case false => {
120+
val err = s"ERROR: the $dir table $table does not exist"
121+
logger.error(err)
122+
sys.exit(-1)
123+
}
124+
}
125+
}
126+
case false => {
127+
val err = s"ERROR: the $dir keyspace $keyspace does not exist"
128+
logger.error(err)
129+
sys.exit(-1)
130+
}
131+
}
132+
}
133+
case _ => {
134+
val err = s"ERROR: The job was not able to connecto to the $dir"
135+
logger.error(err)
136+
sys.exit(-1)
137+
}
138+
}
139+
} match {
140+
case Failure(_) => {
141+
val err = s"ERROR: Detected connectivity issue. Check the reference conf file/Glue connection for the $dir, the job is aborted"
142+
logger.error(err)
143+
sys.exit(-1)
144+
}
145+
case Success(_) => {
146+
logger.info(s"Connected to the $dir")
147+
}
148+
}
149+
}
150+
100151
val WAIT_TIME = 10000
101152
val MAX_RETRY_ATTEMPTS = 256
102153
// Unit ms
@@ -133,10 +184,21 @@ object GlueApp {
133184
val cassandraConn = customConnections._2
134185
val keyspacesConn = customConnections._1
135186
val landingZone = args("S3_LANDING_ZONE")
187+
val bcktName = landingZone.replaceAll("s3://", "")
136188
val columnTs = args("WRITETIME_COLUMN")
137189
val source = s"sourceCluster.$srcKeyspaceName.$srcTableName"
138190
val ttlColumn = args("TTL_COLUMN")
139191
val olo = args("OFFLOAD_LARGE_OBJECTS")
192+
193+
//AmazonS3Client to check if a stop requested issued
194+
val s3client = new AmazonS3Client()
195+
196+
// Let's do preflight checks
197+
logger.info("Preflight check started")
198+
preFlightCheck(cassandraConn, srcKeyspaceName, srcTableName, "source")
199+
preFlightCheck(keyspacesConn, trgKeyspaceName, trgTableName, "target")
200+
logger.info("Preflight check completed")
201+
140202
val selectStmtWithTTL = ttlColumn match {
141203
case "None" => ""
142204
case _ => {
@@ -162,9 +224,6 @@ object GlueApp {
162224

163225
val offloadLargeObjects = parseJSONConfig(offloadLageObjTmp)
164226

165-
//AmazonS3Client to check if a stop requested issued
166-
val s3client = new AmazonS3Client()
167-
168227
def stopRequested(bucket: String): Boolean = {
169228
val key = processType match {
170229
case "discovery" => s"$srcKeyspaceName/$srcTableName/$processType/stopRequested"
@@ -352,7 +411,7 @@ object GlueApp {
352411
val tile = location._2
353412
val ver = location._3
354413

355-
persistToTarget(shuffleDf(sourceDfV2), columns, columnsPos, tile, ver)
414+
persistToTarget(shuffleDfV2(sourceDfV2), columns, columnsPos, tile, ver)
356415
keyspacesConn.withSessionDo {
357416
session => session.execute(s"INSERT INTO migration.ledger(ks,tbl,tile,ver,load_status,dt_load, offload_status) VALUES('$srcKeyspaceName','$srcTableName',$tile,'$ver','SUCCESS', toTimestamp(now()), '')")
358417
}
@@ -401,6 +460,7 @@ object GlueApp {
401460
}
402461
}
403462

463+
// partitonBy instead of repartition
404464
def keysDiscoveryProcess() {
405465
val primaryKeysDf = sparkSession.read.option("inferSchema", "true").table(source)
406466
val primaryKeysDfwithTS = primaryKeysDf.selectExpr(pkFinal.map(c => c): _*)
@@ -471,7 +531,6 @@ object GlueApp {
471531
groupedPkDF.unpersist()
472532
}
473533

474-
val bcktName = landingZone.replaceAll("s3://", "")
475534
Iterator.continually(stopRequested(bcktName)).takeWhile(_ == false).foreach {
476535
_ => {
477536
processType match {

0 commit comments

Comments
 (0)