@@ -364,16 +364,16 @@ def create_dataset(dataset = nil, reference: nil)
364
364
end
365
365
body = {
366
366
dataset_reference : {
367
- project_id : @project ,
367
+ project_id : @destination_project ,
368
368
dataset_id : dataset ,
369
369
} ,
370
370
} . merge ( hint )
371
371
if @location
372
372
body [ :location ] = @location
373
373
end
374
374
opts = { }
375
- Embulk . logger . debug { "embulk-output-bigquery: insert_dataset(#{ @project } , #{ dataset } , #{ @location_for_log } , #{ body } , #{ opts } )" }
376
- with_network_retry { client . insert_dataset ( @project , body , **opts ) }
375
+ Embulk . logger . debug { "embulk-output-bigquery: insert_dataset(#{ @destination_project } , #{ dataset } , #{ @location_for_log } , #{ body } , #{ opts } )" }
376
+ with_network_retry { client . insert_dataset ( @destination_project , body , **opts ) }
377
377
rescue Google ::Apis ::ServerError , Google ::Apis ::ClientError , Google ::Apis ::AuthorizationError => e
378
378
if e . status_code == 409 && /Already Exists:/ =~ e . message
379
379
# ignore 'Already Exists' error
@@ -382,7 +382,7 @@ def create_dataset(dataset = nil, reference: nil)
382
382
383
383
response = { status_code : e . status_code , message : e . message , error_class : e . class }
384
384
Embulk . logger . error {
385
- "embulk-output-bigquery: insert_dataset(#{ @project } , #{ body } , #{ opts } ), response:#{ response } "
385
+ "embulk-output-bigquery: insert_dataset(#{ @destination_project } , #{ body } , #{ opts } ), response:#{ response } "
386
386
}
387
387
raise Error , "failed to create dataset #{ @destination_project } :#{ dataset } in #{ @location_for_log } , response:#{ response } "
388
388
end
@@ -554,15 +554,15 @@ def patch_description(fields, column_options)
554
554
fields = patch_description ( table . schema . fields , @task [ 'column_options' ] )
555
555
table . schema . update! ( fields : fields )
556
556
table_id = Helper . chomp_partition_decorator ( @task [ 'table' ] )
557
- with_network_retry { client . patch_table ( @project , @dataset , table_id , table ) }
557
+ with_network_retry { client . patch_table ( @destination_project , @dataset , table_id , table ) }
558
558
end
559
559
end
560
560
561
561
def merge ( source_table , target_table , merge_keys , merge_rule )
562
562
columns = @schema . map { |column | column [ :name ] }
563
563
query = <<~EOD
564
- MERGE `#{ @dataset } `.`#{ target_table } ` T
565
- USING `#{ @dataset } `.`#{ source_table } ` S
564
+ MERGE `#{ @destination_project } `.` #{ @ dataset} `.`#{ target_table } ` T
565
+ USING `#{ @destination_project } `.` #{ @ dataset} `.`#{ source_table } ` S
566
566
ON #{ join_merge_keys ( merge_keys . empty? ? merge_keys ( target_table ) : merge_keys ) }
567
567
WHEN MATCHED THEN
568
568
UPDATE SET #{ join_merge_rule_or_columns ( merge_rule , columns ) }
@@ -579,9 +579,9 @@ def merge_keys(table)
579
579
SELECT
580
580
KCU.COLUMN_NAME
581
581
FROM
582
- `#{ @dataset } `.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU
582
+ `#{ @destination_project } `.` #{ @ dataset} `.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU
583
583
JOIN
584
- `#{ @dataset } `.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC
584
+ `#{ @destination_project } `.` #{ @ dataset} `.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC
585
585
ON
586
586
KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND
587
587
KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND
0 commit comments