4848import  io .cdap .cdap .etl .api .batch .BatchSourceContext ;
4949import  io .cdap .cdap .etl .api .connector .Connector ;
5050import  io .cdap .cdap .etl .api .engine .sql .SQLEngineInput ;
51+ import  io .cdap .cdap .etl .api .exception .ErrorDetailsProviderSpec ;
5152import  io .cdap .cdap .etl .api .validation .ValidationFailure ;
5253import  io .cdap .plugin .common .Asset ;
5354import  io .cdap .plugin .common .LineageRecorder ;
5758import  io .cdap .plugin .gcp .bigquery .util .BigQueryConstants ;
5859import  io .cdap .plugin .gcp .bigquery .util .BigQueryUtil ;
5960import  io .cdap .plugin .gcp .common .CmekUtils ;
61+ import  io .cdap .plugin .gcp .common .GCPErrorDetailsProvider ;
6062import  io .cdap .plugin .gcp .common .GCPUtils ;
6163import  org .apache .avro .generic .GenericData ;
6264import  org .apache .hadoop .conf .Configuration ;
@@ -135,7 +137,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {
135137
136138    // Create BigQuery client 
137139    String  serviceAccount  = config .getServiceAccount ();
138-     Credentials  credentials  = BigQuerySourceUtils .getCredentials (config .getConnection ());
140+     Credentials  credentials  = null ;
141+     try  {
142+       credentials  = BigQuerySourceUtils .getCredentials (config .getConnection ());
143+     } catch  (Exception  e ) {
144+       String  errorReason  = "Unable to load service account credentials." ;
145+       collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
146+         .withStacktrace (e .getStackTrace ());
147+       collector .getOrThrowException ();
148+     }
149+ 
150+ 
139151    BigQuery  bigQuery  = GCPUtils .getBigQuery (config .getProject (), credentials , null );
140152    Dataset  dataset  = bigQuery .getDataset (DatasetId .of (config .getDatasetProject (), config .getDataset ()));
141153    Storage  storage  = GCPUtils .getStorage (config .getProject (), credentials );
@@ -144,19 +156,30 @@ public void prepareRun(BatchSourceContext context) throws Exception {
144156    bucketPath  = UUID .randomUUID ().toString ();
145157    CryptoKeyName  cmekKeyName  = CmekUtils .getCmekKey (config .cmekKey , context .getArguments ().asMap (), collector );
146158    collector .getOrThrowException ();
147-     configuration  = BigQueryUtil .getBigQueryConfig (serviceAccount , config .getProject (), cmekKeyName ,
148-                                                    config .getServiceAccountType ());
159+     try  {
160+       configuration  = BigQueryUtil .getBigQueryConfig (serviceAccount , config .getProject (), cmekKeyName ,
161+         config .getServiceAccountType ());
162+     } catch  (Exception  e ) {
163+       String  errorReason  = "Failed to create BigQuery configuration." ;
164+       collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
165+         .withStacktrace (e .getStackTrace ());
166+       collector .getOrThrowException ();
167+     }
149168
150169    String  bucketName  = BigQueryUtil .getStagingBucketName (context .getArguments ().asMap (), null ,
151170                                                          dataset , config .getBucket ());
152171
153172    // Configure GCS Bucket to use 
154-     String  bucket  = BigQuerySourceUtils .getOrCreateBucket (configuration ,
155-                                                           storage ,
156-                                                           bucketName ,
157-                                                           dataset ,
158-                                                           bucketPath ,
159-                                                           cmekKeyName );
173+     String  bucket  = null ;
174+     try  {
175+       bucket  = BigQuerySourceUtils .getOrCreateBucket (configuration , storage , bucketName , dataset , bucketPath ,
176+         cmekKeyName );
177+     } catch  (Exception  e ) {
178+       String  errorReason  = "Failed to create bucket." ;
179+       collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
180+         .withStacktrace (e .getStackTrace ());
181+       collector .getOrThrowException ();
182+     }
160183
161184    // Configure Service account credentials 
162185    BigQuerySourceUtils .configureServiceAccount (configuration , config .getConnection ());
@@ -166,10 +189,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {
166189
167190    // Configure BigQuery input format. 
168191    String  temporaryGcsPath  = BigQuerySourceUtils .getTemporaryGcsPath (bucket , bucketPath , bucketPath );
169-     BigQuerySourceUtils .configureBigQueryInput (configuration ,
170-                                                DatasetId .of (config .getDatasetProject (), config .getDataset ()),
171-                                                config .getTable (),
172-                                                temporaryGcsPath );
192+     try  {
193+       BigQuerySourceUtils .configureBigQueryInput (configuration ,
194+         DatasetId .of (config .getDatasetProject (), config .getDataset ()),
195+         config .getTable (),
196+         temporaryGcsPath );
197+     } catch  (Exception  e ) {
198+       String  errorReason  = "Failed to configure BigQuery input." ;
199+       collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
200+         .withStacktrace (e .getStackTrace ());
201+       collector .getOrThrowException ();
202+     }
173203
174204    // Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists. 
175205    // We call emitLineage before since it creates the dataset with schema. 
@@ -178,6 +208,10 @@ public void prepareRun(BatchSourceContext context) throws Exception {
178208      .setFqn (BigQueryUtil .getFQN (config .getDatasetProject (), config .getDataset (), config .getTable ()))
179209      .setLocation (dataset .getLocation ())
180210      .build ();
211+ 
212+     // set error details provider 
213+     context .setErrorDetailsProvider (new  ErrorDetailsProviderSpec (GCPErrorDetailsProvider .class .getName ()));
214+ 
181215    emitLineage (context , configuredSchema , sourceTableType , config .getTable (), asset );
182216    setInputFormat (context , configuredSchema );
183217  }
0 commit comments