Skip to content

Commit 7d2b3c7

Browse files
authored
IBM watsonx.data destination connector: improve performance with partitioning, metadata cleanup, and increased retries (#606)
1 parent 770eb54 commit 7d2b3c7

File tree

4 files changed

+104
-11
lines changed

4 files changed

+104
-11
lines changed

snippets/general-shared-text/ibm-watsonxdata-api-placeholders.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@
88
- `<catalog>` (_required_): The name of the target Apache Iceberg-based catalog within the IBM watsonx.data data store instance.
99
- `<namespace>` (_required_): The name of the target namespace (also known as a schema) within the catalog.
1010
- `<table>` (_required_): The name of the target table within the namespace (schema).
11-
- `<max-retries>`: The maximum number of retries for the upload process. The default is `50`. If specified, it must be a number between `2` and `500`, inclusive.
12-
- `<max-retries-connection>`: The maximum number of retries when connecting to the catalog. The default is `10`. If specified, it must be a number between `2` and `100`, inclusive.
11+
- `<max-retries>`: The maximum number of retries for the upload process. Typically, an optimal setting is `150`. The default is `50`. If specified, it must be a number between `2` and `500`, inclusive.
12+
- `<max-retries-connection>`: The maximum number of retries when connecting to the catalog. Typically, an optimal setting is `15`. The default is `10`. If specified, it must be a number between `2` and `100`, inclusive.
1313
- `<record-id-key>`: The name of the column that uniquely identifies each record in the target table. The default is `record_id`.

snippets/general-shared-text/ibm-watsonxdata-cli-api.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ The following environment variables:
2323

2424
Additionally:
2525

26-
- `--max-retries-connection` (CLI) or `max_retries_connection` (Python) is an optional parameter that specifies the maximum number of retries when connecting to the catalog. The default is `10`. If specified, it must be a number between `2` and `100`, inclusive.
27-
- `--max-retries` (CLI) or `max_retries` (Python) is an optional parameter that specifies the number of times to retry uploading data. The default is `50`. If specified, it must be a number between `2` and `500`, inclusive.
26+
- `--max-retries-connection` (CLI) or `max_retries_connection` (Python) is an optional parameter that specifies the maximum number of retries when connecting to the catalog. Typically, an optimal setting is `15`. The default is `10`. If specified, it must be a number between `2` and `100`, inclusive.
27+
- `--max-retries` (CLI) or `max_retries` (Python) is an optional parameter that specifies the number of times to retry uploading data. Typically, an optimal setting is `150`. The default is `50`. If specified, it must be a number between `2` and `500`, inclusive.

snippets/general-shared-text/ibm-watsonxdata-platform.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ Fill in the following fields:
1010
- **Catalog** (_required_): The name of the target Apache Iceberg-based catalog within the IBM watsonx.data data store instance.
1111
- **Namespace** (_required_): The name of the target namespace (also known as a schema) within the catalog.
1212
- **Table** (_required_): The name of the target table within the namespace (schema).
13-
- **Max Connection Retries**: The maximum number of retries when connecting to the catalog. The default is `10`. If specified, it must be a number between `2` and `100`, inclusive.
14-
- **Max Retries**: The maximum number of retries when uploading data. The default is `50`. If specified, it must be a number between `2` and `500`, inclusive.
13+
- **Max Connection Retries**: The maximum number of retries when connecting to the catalog. Typically, an optimal setting is `15`. The default is `10`. If specified, it must be a number between `2` and `100`, inclusive.
14+
- **Max Retries**: The maximum number of retries when uploading data. Typically, an optimal setting is `150`. The default is `50`. If specified, it must be a number between `2` and `500`, inclusive.
1515
- **Record ID Key**: The name of the column that uniquely identifies each record in the target table. The default is `record_id`.

snippets/general-shared-text/ibm-watsonxdata.mdx

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@
160160
8. Click the ellipses, and then click **Create schema**.
161161
9. Enter some **Name** for the schema, and then click **Create**.
162162
10. On the sidebar, click **Query workspace**.
163-
11. In the SQL editor, enter and run a table creation statement such as the following, replacing `<catalog-name>` with the name of the target
163+
11. In the SQL editor, enter and run a table creation statement such as the following one that uses
164+
[Presto SQL](https://prestodb.io/docs/current/connector/iceberg.html) syntax, replacing `<catalog-name>` with the name of the target
164165
catalog and `<schema-name>` with the name of the target schema:
165166

166167
```sql
@@ -183,8 +184,8 @@
183184
"filesize_bytes" bigint,
184185
"points" varchar,
185186
"system" varchar,
186-
"layout_width" bigint,
187-
"layout_height" bigint,
187+
"layout_width" double,
188+
"layout_height" double,
188189
"id" varchar,
189190
"record_id" varchar,
190191
"parent_id" varchar
@@ -196,12 +197,17 @@
196197
)
197198
```
198199

199-
Note that incoming elements that do not have matching column
200+
Incoming elements that do not have matching column
200201
names will be dropped upon record insertion. For example, if the incoming data has an element named `sent_from` and there is no
201202
column named `sent_from` in the table, the `sent_from` element will be dropped upon record insertion. You should modify the preceding
202203
sample table creation statement to add columns for any additional elements that you want to be included upon record
203204
insertion.
204205

206+
To increase query performance, Iceberg uses [hidden partitioning](https://iceberg.apache.org/docs/latest/partitioning/) to
207+
group similar rows together when writing. You can also
208+
[explicitly define partitions](https://prestodb.io/docs/current/connector/iceberg.html#create-table) as part of the
209+
preceding `CREATE TABLE` statement.
210+
205211
- The name of the target namespace (also known as a schema) within the target catalog, and name of the target table within that schema. To get these:
206212

207213
1. [Log in to your IBM Cloud account](https://cloud.ibm.com/login).
@@ -214,4 +220,91 @@
214220
top navigation bar.
215221
7. On the **Browse data** tab, expand the name of the target catalog, and note the names of the target schema and target table.
216222

217-
- The name of the column in the target table that uniquely identifies each of the records in the table.
223+
- The name of the column in the target table that uniquely identifies each of the records in the table.
224+
225+
- To improve performance, the target table should be set to regularly remove old metadata files. To do this, run the following Python script.
226+
(You cannot use the preceding `CREATE TABLE` statement, or other SQL statements such as `ALTER TABLE`, to set this behavior.) To get the
227+
values for the specified environment variables, see the preceding instructions.
228+
229+
```python
230+
# Improves performance by setting the target table to regularly remove
231+
# old metadata files.
232+
#
233+
# First, install the following dependencies into your Python virtual
234+
# environment:
235+
#
236+
# pip install requests pyiceberg pyarrow
237+
#
238+
# Then, set the following environment variables:
239+
#
240+
# IBM_IAM_API_KEY - An API key value for the target IBM Cloud account.
241+
# IBM_ICEBERG_CATALOG_METASTORE_REST_ENDPOINT - The metastore REST endpoint
242+
# value for the target Apache Iceberg catalog in the target IBM watsonx.data
243+
# data store instance.
244+
# IBM_COS_BUCKET_PUBLIC_ENDPOINT - The target IBM Cloud Object Storage (COS)
245+
# instance’s endpoint value.
246+
# IBM_COS_ACCESS_KEY_ID - An HMAC access key ID for the target COS instance.
247+
# IBM_COS_SECRET_ACCESS_KEY - The associated HMAC secret access key ID for the
248+
# target HMAC access key.
249+
# IBM_COS_REGION - The target COS instance’s region short ID.
250+
# IBM_ICEBERG_CATALOG - The name of the target Iceberg catalog.
251+
# IBM_ICEBERG_SCHEMA - The name of the target namespace (also known as a schema)
252+
# in the target catalog.
253+
# IBM_ICEBERG_TABLE - The name of the target table in the target schema.
254+
#
255+
# To get these values, see the Unstructured documentation for the
256+
# IBM watsonx.data connector.
257+
258+
import os
259+
import requests
260+
from pyiceberg.catalog import load_catalog
261+
262+
def main():
263+
# Get a bearer token for the target IBM Cloud account.
264+
bearer_token = requests.post(
265+
url="https://iam.cloud.ibm.com/identity/token",
266+
headers={
267+
"Content-Type": "application/x-www-form-urlencoded",
268+
"Accept": "application/json"
269+
},
270+
data={
271+
"grant_type": "urn:ibm:params:oauth:grant-type:apikey",
272+
"apikey": os.getenv("IBM_IAM_API_KEY")
273+
}
274+
).json().get("access_token")
275+
276+
# Connect to the target Iceberg catalog.
277+
catalog = load_catalog(
278+
os.getenv("IBM_ICEBERG_CATALOG"),
279+
**{
280+
"type": "rest",
281+
"uri": f"https://{os.getenv("IBM_ICEBERG_CATALOG_METASTORE_REST_ENDPOINT")}/mds/iceberg",
282+
"token": bearer_token,
283+
"warehouse": os.getenv("IBM_ICEBERG_CATALOG"),
284+
"s3.endpoint": os.getenv("IBM_COS_BUCKET_PUBLIC_ENDPOINT"),
285+
"s3.access-key-id": os.getenv("IBM_COS_ACCESS_KEY"),
286+
"s3.secret-access-key": os.getenv("IBM_COS_SECRET_ACCESS_KEY"),
287+
"s3.region": os.getenv("IBM_COS_BUCKET_REGION")
288+
},
289+
)
290+
291+
# Load the target table.
292+
table = catalog.load_table(f"{os.getenv("IBM_ICEBERG_SCHEMA")}.{os.getenv("IBM_ICEBERG_TABLE")}")
293+
294+
# Set the target table's properties to remove old metadata files.
295+
with table.transaction() as transaction:
296+
transaction.set_properties(
297+
{
298+
"commit.manifest.min-count-to-merge": 10,
299+
"commit.manifest-merge.enabled": True,
300+
"write.metadata.previous-versions-max": 10,
301+
"write.metadata.delete-after-commit.enabled": True,
302+
}
303+
)
304+
305+
# Confirm that the target table's properties were set as expected.
306+
print(table.metadata.properties)
307+
308+
if __name__ == "__main__":
309+
main()
310+
```

0 commit comments

Comments
 (0)