8
8
<img src="https://img.shields.io/badge/slack-join-community?logo=slack&logoColor=white&style=flat"
9
9
alt="Chat on Slack"></a>
10
10
<a href =" https://github.com/astral-sh/ruff " ><img src =" https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json " alt =" Ruff " style =" max-width :100% ;" ></a >
11
+ <!-- Pytest Coverage Comment:Begin -->
12
+ <!-- Pytest Coverage Comment:End -->
11
13
12
14
13
15
# GlassFlow Python SDK
@@ -22,11 +24,12 @@ You can install the GlassFlow Python SDK using pip:
22
24
pip install glassflow
23
25
```
24
26
25
- ## Available Operations
27
+ ## Data Operations
26
28
27
29
* [ publish] ( #publish ) - Publish a new event into the pipeline
28
30
* [ consume] ( #consume ) - Consume the transformed event from the pipeline
29
31
* [ consume failed] ( #consume-failed ) - Consume the events that failed from the pipeline
32
+ * [ validate credentials] ( #validate-credentials ) - Validate pipeline credentials
30
33
31
34
32
35
## publish
@@ -36,12 +39,11 @@ Publish a new event into the pipeline
36
39
### Example Usage
37
40
38
41
``` python
39
- import glassflow
42
+ from glassflow import PipelineDataSource
40
43
41
- client = glassflow.GlassFlowClient()
42
- pipeline_client = client.pipeline_client(pipeline_id = " <str value" , pipeline_access_token = " <str value>" )
44
+ source = PipelineDataSource(pipeline_id = " <str value" , pipeline_access_token = " <str token>" )
43
45
data = {} # your json event
44
- res = pipeline_client .publish(request_body = data)
46
+ res = source .publish(request_body = data)
45
47
46
48
if res.status_code == 200 :
47
49
print (" Published sucessfully" )
@@ -55,33 +57,163 @@ Consume the transformed event from the pipeline
55
57
### Example Usage
56
58
57
59
``` python
58
- import glassflow
60
+ from glassflow import PipelineDataSink
59
61
60
- client = glassflow.GlassFlowClient()
61
- pipeline_client = client.pipeline_client(pipeline_id = " <str value" , pipeline_access_token = " <str value>" )
62
- res = pipeline_client.consume()
62
+ sink = PipelineDataSink(pipeline_id = " <str value" , pipeline_access_token = " <str value>" )
63
+ res = sink.consume()
63
64
64
65
if res.status_code == 200 :
65
- print (res.body.event )
66
+ print (res.json() )
66
67
```
67
68
69
+
68
70
## consume failed
69
71
70
72
If the transformation failed for any event, they are available in a failed queue. You can consume those events from the pipeline
71
73
72
74
### Example Usage
73
75
74
76
``` python
75
- import glassflow
77
+ from glassflow import PipelineDataSink
76
78
77
- client = glassflow.GlassFlowClient()
78
- pipeline_client = client.pipeline_client(pipeline_id = " <str value" , pipeline_access_token = " <str value>" )
79
- res = pipeline_client.consume_failed()
79
+ sink = PipelineDataSink(pipeline_id = " <str value" , pipeline_access_token = " <str value>" )
80
+ res = sink.consume_failed()
80
81
81
82
if res.status_code == 200 :
82
- print (res.body.event)
83
+ print (res.json())
84
+ ```
85
+
86
+
87
+ ## validate credentials
88
+
89
+ Validate pipeline credentials (` pipeline_id ` and ` pipeline_access_token ` ) from source or sink
90
+
91
+ ### Example Usage
92
+
93
+ ``` python
94
+ from glassflow import PipelineDataSource, errors
95
+
96
+ try :
97
+ source = PipelineDataSource(pipeline_id = " <str value" , pipeline_access_token = " <str value>" )
98
+ source.validate_credentials()
99
+ except errors.PipelineNotFoundError as e:
100
+ print (" Pipeline ID does not exist!" )
101
+ raise e
102
+ except errors.PipelineAccessTokenInvalidError as e:
103
+ print (" Pipeline Access Token is invalid!" )
104
+ raise e
105
+ ```
106
+
107
+
108
+ ## Pipeline Management
109
+
110
+ In order to manage your pipelines with this SDK, one needs to provide the ` PERSONAL_ACCESS_TOKEN `
111
+ to the GlassFlow client.
112
+
113
+ ``` python
114
+ from glassflow import GlassFlowClient
115
+
116
+ client = GlassFlowClient(personal_access_token = " <your personal access token>" )
117
+ ```
118
+
119
+ Now you can perform CRUD operations on your pipelines:
120
+
121
+ * [ list_pipelines] ( #list_pipelines ) - Returns the list of pipelines available
122
+ * [ get_pipeline] ( #get_pipeline ) - Returns a pipeline object from a given pipeline ID
123
+ * [ create] ( #create ) - Create a new pipeline
124
+ * [ delete] ( #delete ) - Delete an existing pipeline
125
+
126
+ ## list_pipelines
127
+
128
+ Returns information about the available pipelines. It can be restricted to a
129
+ specific space by passing the ` space_id ` .
130
+
131
+ ### Example Usage
132
+
133
+ ``` python
134
+ from glassflow import GlassFlowClient
135
+
136
+ client = GlassFlowClient(personal_access_token = " <your access token>" )
137
+ res = client.list_pipelines()
138
+ ```
139
+
140
+ ## get_pipeline
141
+
142
+ Gets information about a pipeline from a given pipeline ID. It returns a Pipeline object
143
+ which can be used manage the Pipeline.
144
+
145
+ ### Example Usage
146
+
147
+ ``` python
148
+ from glassflow import GlassFlowClient
149
+
150
+ client = GlassFlowClient(personal_access_token = " <your access token>" )
151
+ pipeline = client.get_pipeline(pipeline_id = " <your pipeline id>" )
152
+
153
+ print (" Name:" , pipeline.name)
83
154
```
84
155
156
+ ## create
157
+
158
+ The Pipeline object has a create method that creates a new GlassFlow pipeline.
159
+
160
+ ### Example Usage
161
+
162
+ ``` python
163
+ from glassflow import Pipeline
164
+
165
+ pipeline = Pipeline(
166
+ name = " <your pipeline name>" ,
167
+ transformation_file = " path/to/transformation.py" ,
168
+ space_id = " <your space id>" ,
169
+ personal_access_token = " <your personal access token>"
170
+ ).create()
171
+ ```
172
+
173
+ In the next example we create a pipeline with Google PubSub source
174
+ and a webhook sink:
175
+
176
+ ``` python
177
+ from glassflow import Pipeline
178
+
179
+ pipeline = Pipeline(
180
+ name = " <your pipeline name>" ,
181
+ transformation_file = " path/to/transformation.py" ,
182
+ space_id = " <your space id>" ,
183
+ personal_access_token = " <your personal access token>" ,
184
+ source_kind = " google_pubsub" ,
185
+ source_config = {
186
+ " project_id" : " <your gcp project id>" ,
187
+ " subscription_id" : " <your subscription id>" ,
188
+ " credentials_json" : " <your credentials json>"
189
+ },
190
+ sink_kind = " webhook" ,
191
+ sink_config = {
192
+ " url" : " <webhook url>" ,
193
+ " method" : " <GET | POST | PUT | PATCH | DELETE>" ,
194
+ " headers" : [{" header1" : " header1_value" }]
195
+ }
196
+ ).create()
197
+ ```
198
+
199
+ ## delete
200
+
201
+ The Pipeline object has a delete method to delete a pipeline
202
+
203
+ ### Example Usage
204
+
205
+ ``` python
206
+ from glassflow import Pipeline
207
+
208
+ pipeline = Pipeline(
209
+ name = " <your pipeline name>" ,
210
+ transformation_file = " path/to/transformation.py" ,
211
+ space_id = " <your space id>" ,
212
+ personal_access_token = " <your personal access token>"
213
+ ).create()
214
+
215
+ pipeline.delete()
216
+ ```
85
217
86
218
## Quickstart
87
219
0 commit comments