Skip to content

Commit 02e6ab7

Browse files
authored
[FSTORE-1580] OnlineFS Observability (#312)
Example of writing to online store and checking the status of online ingestions - including getting error messages.
1 parent 983eeb7 commit 02e6ab7

File tree

1 file changed

+226
-0
lines changed

1 file changed

+226
-0
lines changed
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "6e610b6e",
6+
"metadata": {},
7+
"source": [
8+
"## Online ingestion observability"
9+
]
10+
},
11+
{
12+
"cell_type": "markdown",
13+
"id": "374d92b4",
14+
"metadata": {},
15+
"source": [
16+
"This API provides the possibility to wait for online for online ingestion to finish."
17+
]
18+
},
19+
{
20+
"cell_type": "markdown",
21+
"id": "b79e1085",
22+
"metadata": {},
23+
"source": [
24+
"## Scope"
25+
]
26+
},
27+
{
28+
"cell_type": "markdown",
29+
"id": "fac87590",
30+
"metadata": {},
31+
"source": [
32+
"* Create and insert into online enabled feature group\n",
33+
"* Get online ingestion\n",
34+
"* Use online ingestion object"
35+
]
36+
},
37+
{
38+
"cell_type": "code",
39+
"execution_count": null,
40+
"id": "c5beb4d3",
41+
"metadata": {},
42+
"outputs": [],
43+
"source": [
44+
"import hopsworks"
45+
]
46+
},
47+
{
48+
"cell_type": "markdown",
49+
"id": "784247c7",
50+
"metadata": {},
51+
"source": [
52+
"## Connect to the cluster"
53+
]
54+
},
55+
{
56+
"cell_type": "code",
57+
"execution_count": null,
58+
"id": "23ac970c",
59+
"metadata": {},
60+
"outputs": [],
61+
"source": [
62+
"# Connect to your cluster, to be used running inside Jupyter or jobs inside the cluster.\n",
63+
"project = hopsworks.login()"
64+
]
65+
},
66+
{
67+
"cell_type": "code",
68+
"execution_count": null,
69+
"id": "0c48faaa",
70+
"metadata": {},
71+
"outputs": [],
72+
"source": [
73+
"# Uncomment when connecting to the cluster from an external environment.\n",
74+
"# project = hopsworks.login(project='my_project', host='my_instance', port=443, api_key_value='apikey')"
75+
]
76+
},
77+
{
78+
"cell_type": "markdown",
79+
"id": "2c26f619",
80+
"metadata": {},
81+
"source": [
82+
"## Create and insert into online enabled feature group."
83+
]
84+
},
85+
{
86+
"cell_type": "code",
87+
"execution_count": null,
88+
"id": "554aa4c9",
89+
"metadata": {},
90+
"outputs": [],
91+
"source": [
92+
"# Generate simple dataframe\n",
93+
"import pandas as pd\n",
94+
"size = 10\n",
95+
"fg_data = {'id': range(0, size), 'text': \"test\"}\n",
96+
"fg_df = pd.DataFrame.from_dict(fg_data)"
97+
]
98+
},
99+
{
100+
"cell_type": "code",
101+
"execution_count": null,
102+
"id": "356f8265",
103+
"metadata": {},
104+
"outputs": [],
105+
"source": [
106+
"# Get project feature store\n",
107+
"fs = project.get_feature_store()"
108+
]
109+
},
110+
{
111+
"cell_type": "code",
112+
"execution_count": null,
113+
"id": "a01a0400",
114+
"metadata": {},
115+
"outputs": [],
116+
"source": [
117+
"# Get/create feature group\n",
118+
"fg = fs.get_or_create_feature_group(name=\"fg\", version=1, primary_key=[\"id\"], online_enabled=True)"
119+
]
120+
},
121+
{
122+
"cell_type": "code",
123+
"execution_count": null,
124+
"id": "e2a56fd3",
125+
"metadata": {},
126+
"outputs": [],
127+
"source": [
128+
"# Create a feature group and insert it into it\n",
129+
"fg.insert(fg_df)"
130+
]
131+
},
132+
{
133+
"cell_type": "markdown",
134+
"id": "80e676a5",
135+
"metadata": {},
136+
"source": [
137+
"## Get online ingestion"
138+
]
139+
},
140+
{
141+
"cell_type": "code",
142+
"execution_count": null,
143+
"id": "19d3c236",
144+
"metadata": {},
145+
"outputs": [],
146+
"source": [
147+
"# Get the latest ingestion associated with the feature group\n",
148+
"online_ingestion_instance = fg.get_latest_online_ingestion()"
149+
]
150+
},
151+
{
152+
"cell_type": "code",
153+
"execution_count": null,
154+
"id": "b9711ee6",
155+
"metadata": {},
156+
"outputs": [],
157+
"source": [
158+
"# Get specific ingestion by its id\n",
159+
"#online_ingestion_instance = fg.get_online_ingestion(1)"
160+
]
161+
},
162+
{
163+
"cell_type": "markdown",
164+
"id": "50999fc9",
165+
"metadata": {},
166+
"source": [
167+
"## Use online ingestion object"
168+
]
169+
},
170+
{
171+
"cell_type": "code",
172+
"execution_count": null,
173+
"id": "e0226cab",
174+
"metadata": {},
175+
"outputs": [],
176+
"source": [
177+
"# Wait for online ingestion to finish (same as setting option `wait_for_online_ingestion` to true when inserting)\n",
178+
"online_ingestion_instance.wait_for_completion()"
179+
]
180+
},
181+
{
182+
"cell_type": "code",
183+
"execution_count": null,
184+
"id": "c2956386",
185+
"metadata": {},
186+
"outputs": [],
187+
"source": [
188+
"# Print results of ingestion (if there is only a result with `UPSERTED` status and rows match the expected data size then ingestion did not encounter any issues)\n",
189+
"# ex. [{'onlineIngestionId': 1, 'status': 'UPSERTED', 'rows': 10}]\n",
190+
"print([result.to_dict() for result in online_ingestion_instance.results])"
191+
]
192+
},
193+
{
194+
"cell_type": "code",
195+
"execution_count": null,
196+
"id": "731aa52b",
197+
"metadata": {},
198+
"outputs": [],
199+
"source": [
200+
"# Get logs from the online ingestion service (useful to check what issues were encountered while ingesting data)\n",
201+
"online_ingestion_instance.print_logs(priority=\"error\", size=5)"
202+
]
203+
}
204+
],
205+
"metadata": {
206+
"kernelspec": {
207+
"display_name": ".venv",
208+
"language": "python",
209+
"name": "python3"
210+
},
211+
"language_info": {
212+
"codemirror_mode": {
213+
"name": "ipython",
214+
"version": 3
215+
},
216+
"file_extension": ".py",
217+
"mimetype": "text/x-python",
218+
"name": "python",
219+
"nbconvert_exporter": "python",
220+
"pygments_lexer": "ipython3",
221+
"version": "3.11.11"
222+
}
223+
},
224+
"nbformat": 4,
225+
"nbformat_minor": 5
226+
}

0 commit comments

Comments
 (0)