@@ -70,7 +70,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
7070  public  SalesforceWideRecordReader  initialize (
7171      InputSplit  inputSplit , AuthenticatorCredentials  credentials )
7272      throws  IOException , InterruptedException  {
73-     List <Map <String , ?>> fetchedIdList  = fetchBulkQueryIds (inputSplit , null );
73+     // Use default configurations of BulkRecordReader. 
74+     super .initialize (inputSplit , credentials );
75+ 
76+     List <Map <String , ?>> fetchedIdList  = fetchBulkQueryIds ();
7477    LOG .debug ("Number of records received from batch job for wide object: '{}'" , fetchedIdList .size ());
7578
7679    try  {
@@ -84,12 +87,18 @@ public SalesforceWideRecordReader initialize(
8487        Lists .partition (fetchedIdList , SalesforceSourceConstants .WIDE_QUERY_MAX_BATCH_COUNT );
8588      LOG .debug ("Number of partitions to be fetched for wide object: '{}'" , partitions .size ());
8689
90+       // Process partitions with batches sized to adhere to API limits and optimize memory usage. 
91+       // [CDAP]TODO: Address issues while handling large datasets. 
8792      results  = partitions .parallelStream ()
88-         .map (this ::getSObjectIds )
89-         .map (sObjectIds  -> fetchPartition (partnerConnection , fields , sObjectName , sObjectIds ))
90-         .flatMap (Arrays ::stream )
91-         .map (sObject  -> transformer .transformToMap (sObject , sObjectDescriptor ))
92-         .collect (Collectors .toList ());
93+           .flatMap (partition  -> processPartition (partnerConnection , fields , sObjectName ,
94+               partition , sObjectDescriptor ).stream ())
95+           .collect (Collectors .toList ());
96+ 
97+       if  (results  == null ) {
98+         LOG .warn ("Result list is null after processing partitions." );
99+         results  = new  ArrayList <>();
100+       }
101+ 
93102      return  this ;
94103    } catch  (ConnectionException  e ) {
95104      String  errorMessage  = SalesforceConnectionUtil .getSalesforceErrorMessageFromException (e );
@@ -123,15 +132,10 @@ public float getProgress() {
123132  /** 
124133   * Fetches single entry map (Id -> SObjectId_value) values received from Bulk API. 
125134   * 
126-    * @param inputSplit         specifies batch details 
127-    * @param taskAttemptContext task context 
128-    * @return list of single entry Map 
129135   * @throws IOException          can be due error during reading query 
130-    * @throws InterruptedException interrupted sleep while waiting for batch results 
131136   */ 
132-   private  List <Map <String , ?>> fetchBulkQueryIds (InputSplit  inputSplit , TaskAttemptContext  taskAttemptContext )
133-     throws  IOException , InterruptedException  {
134-     super .initialize (inputSplit , taskAttemptContext );
137+   private  List <Map <String , ?>> fetchBulkQueryIds ()
138+     throws  IOException  {
135139    List <Map <String , ?>> fetchedIdList  = new  ArrayList <>();
136140    while  (super .nextKeyValue ()) {
137141      fetchedIdList .add (super .getCurrentValue ());
@@ -181,4 +185,37 @@ private SObject[] fetchPartition(PartnerConnection partnerConnection, String fie
181185        e );
182186    }
183187  }
188+ 
189+   /** 
190+    * Processes a partition of SObject records by dividing the IDs into smaller batches, 
191+    * retrieving the corresponding records from Salesforce, and transforming them into maps. 
192+    * 
193+    * @param partnerConnection the Salesforce partner connection used for retrieving data. 
194+    * @param fields the fields to be retrieved for each SObject. 
195+    * @param sObjectName the name of the Salesforce object (e.g., Account, Lead). 
196+    * @param partition the partition containing the ID records to be processed. 
197+    * @param sObjectDescriptor descriptor containing the structure of the SObject. 
198+    * @return result from partitions 
199+    */ 
200+   private  List <Map <String , ?>> processPartition (PartnerConnection  partnerConnection , String  fields , String  sObjectName ,
201+                                                 List <Map <String , ?>> partition , SObjectDescriptor  sObjectDescriptor ) {
202+     List <Map <String , ?>> partitionResults  = new  ArrayList <>();
203+      // Divide the list of SObject Ids into smaller batches to avoid exceeding retrieve id limits. 
204+ 
205+     /* see more - https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/ 
206+     salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_apicalls.htm */ 
207+     List <List <String >> idBatches  = Lists .partition (
208+         Arrays .asList (getSObjectIds (partition )), SalesforceSourceConstants .RETRIEVE_MAX_BATCH_COUNT );
209+ 
210+     // Iterate over each batch of Ids to fetch the records. 
211+     idBatches .forEach (idBatch  -> {
212+       SObject [] fetchedObjects  = fetchPartition (
213+           partnerConnection , fields , sObjectName , idBatch .toArray (new  String [0 ]));
214+       Arrays .stream (fetchedObjects )
215+           .map (sObject  -> transformer .transformToMap (sObject , sObjectDescriptor ))
216+           .forEach (partitionResults ::add );
217+     });
218+ 
219+     return  partitionResults ;
220+   }
184221}
0 commit comments