diff --git a/components/influxdb_cloud/actions/invoke-script/invoke-script.mjs b/components/influxdb_cloud/actions/invoke-script/invoke-script.mjs new file mode 100644 index 0000000000000..5c8a88160aa75 --- /dev/null +++ b/components/influxdb_cloud/actions/invoke-script/invoke-script.mjs @@ -0,0 +1,38 @@ +import influxDbCloud from "../../influxdb_cloud.app.mjs"; +import { parseObjectEntries } from "../../common/utils.mjs"; + +export default { + key: "influxdb_cloud-invoke-script", + name: "Invoke Script", + description: "Runs a script and returns the result. [See the documentation](https://docs.influxdata.com/influxdb3/cloud-serverless/api/v2/#operation/PostScriptsIDInvoke)", + version: "0.0.1", + type: "action", + props: { + influxDbCloud, + scriptId: { + propDefinition: [ + influxDbCloud, + "scriptId", + ], + }, + params: { + type: "object", + label: "Params", + description: "The script parameters. params contains key-value pairs that map values to the params.keys in a script. When you invoke a script with params, InfluxDB passes the values as invocation parameters to the script.", + optional: true, + }, + }, + async run({ $ }) { + const response = await this.influxDbCloud.invokeScript({ + $, + scriptId: this.scriptId, + data: { + params: this.params + ? parseObjectEntries(this.params) + : {}, + }, + }); + $.export("$summary", `Successfully invoked script with ID: ${this.scriptId}`); + return response; + }, +}; diff --git a/components/influxdb_cloud/actions/update-bucket/update-bucket.mjs b/components/influxdb_cloud/actions/update-bucket/update-bucket.mjs new file mode 100644 index 0000000000000..eb0f8a868bfe5 --- /dev/null +++ b/components/influxdb_cloud/actions/update-bucket/update-bucket.mjs @@ -0,0 +1,62 @@ +import influxDbCloud from "../../influxdb_cloud.app.mjs"; + +export default { + key: "influxdb_cloud-update-bucket", + name: "Update Bucket", + description: "Updates an existing bucket in InfluxDB Cloud. [See the documentation](https://docs.influxdata.com/influxdb3/cloud-serverless/api/v2/#operation/PatchBucketsID)", + version: "0.0.1", + type: "action", + props: { + influxDbCloud, + bucketId: { + propDefinition: [ + influxDbCloud, + "bucketId", + ], + }, + name: { + type: "string", + label: "Name", + description: "Name of the bucket. Must contain two or more characters. Cannot start with an underscore (_). Cannot contain a double quote (\"). Note: System buckets cannot be renamed.", + optional: true, + }, + description: { + type: "string", + label: "Description", + description: "A description of the bucket", + optional: true, + }, + everySeconds: { + type: "integer", + label: "Every Seconds", + description: "The duration in seconds for how long data will be kept in the database. The default duration is 2592000 (30 days). 0 represents infinite retention.", + default: 2592000, + optional: true, + }, + shardGroupDurationSeconds: { + type: "integer", + label: "Shard Group Duration Seconds", + description: "The shard group duration. The duration or interval (in seconds) that each shard group covers.", + optional: true, + }, + }, + async run({ $ }) { + const response = await this.influxDbCloud.updateBucket({ + $, + bucketId: this.bucketId, + data: { + name: this.name, + description: this.description, + retentionRules: [ + { + everySeconds: this.everySeconds, + shardGroupDurationSeconds: this.shardGroupDurationSeconds, + type: "expire", + }, + ], + }, + }); + $.export("$summary", `Successfully updated bucket with ID: ${response.id}`); + return response; + }, +}; diff --git a/components/influxdb_cloud/actions/write-data/write-data.mjs b/components/influxdb_cloud/actions/write-data/write-data.mjs new file mode 100644 index 0000000000000..e8eab48436131 --- /dev/null +++ b/components/influxdb_cloud/actions/write-data/write-data.mjs @@ -0,0 +1,50 @@ +import influxDbCloud from "../../influxdb_cloud.app.mjs"; + +export default { + key: "influxdb_cloud-write-data", + name: "Write Data", + description: "Write data to a specific bucket in InfluxDB Cloud. [See the documentation](https://docs.influxdata.com/influxdb3/cloud-serverless/api/v2/#operation/PostWrite)", + version: "0.0.1", + type: "action", + props: { + influxDbCloud, + bucketId: { + propDefinition: [ + influxDbCloud, + "bucketId", + ], + }, + data: { + type: "string", + label: "Data", + description: "Provide data in [line protocol format](https://docs.influxdata.com/influxdb3/cloud-serverless/reference/syntax/line-protocol/). Example: `measurementName fieldKey=\"field string value\" 1795523542833000000`", + }, + precision: { + type: "string", + label: "Precision", + description: "The precision for unix timestamps in the line protocol batch", + options: [ + "ms", + "s", + "us", + "ns", + ], + optional: true, + }, + }, + async run({ $ }) { + const response = await this.influxDbCloud.writeData({ + $, + params: { + bucket: this.bucketId, + precision: this.precision, + }, + data: this.data, + headers: { + "Content-Type": "text/plain", + }, + }); + $.export("$summary", "Successfully wrote data to bucket"); + return response; + }, +}; diff --git a/components/influxdb_cloud/common/utils.mjs b/components/influxdb_cloud/common/utils.mjs new file mode 100644 index 0000000000000..9c3cdbf569744 --- /dev/null +++ b/components/influxdb_cloud/common/utils.mjs @@ -0,0 +1,22 @@ +function optionalParseAsJSON(value) { + try { + return JSON.parse(value); + } catch (e) { + return value; + } +} + +export function parseObjectEntries(value) { + const obj = typeof value === "string" + ? JSON.parse(value) + : value; + return Object.fromEntries( + Object.entries(obj).map(([ + key, + value, + ]) => [ + key, + optionalParseAsJSON(value), + ]), + ); +} diff --git a/components/influxdb_cloud/influxdb_cloud.app.mjs b/components/influxdb_cloud/influxdb_cloud.app.mjs index 4abef74d7b357..9967350dfe9b1 100644 --- a/components/influxdb_cloud/influxdb_cloud.app.mjs +++ b/components/influxdb_cloud/influxdb_cloud.app.mjs @@ -1,11 +1,151 @@ +import { axios } from "@pipedream/platform"; +const LIMIT = 50; + export default { type: "app", app: "influxdb_cloud", - propDefinitions: {}, + propDefinitions: { + bucketId: { + type: "string", + label: "Bucket ID", + description: "The identifier of a bucket", + async options({ page }) { + const { buckets } = await this.listBuckets({ + params: { + limit: LIMIT, + offset: page * LIMIT, + }, + }); + return buckets?.map(({ + id: value, name: label, + }) => ({ + label, + value, + })) || []; + }, + }, + scriptId: { + type: "string", + label: "Script ID", + description: "The identifier of a script", + async options({ page }) { + const { scripts } = await this.listScripts({ + params: { + limit: LIMIT, + offset: page * LIMIT, + }, + }); + return scripts?.map(({ + id: value, name: label, + }) => ({ + label, + value, + })) || []; + }, + }, + }, methods: { - // this.$auth contains connected account data - authKeys() { - console.log(Object.keys(this.$auth)); + _baseUrl(version) { + let { url } = this.$auth; + if (version === "v2") { + url += (url.endsWith("/") + ? "" + : "/") + "api/v2"; + } else { + url = url.endsWith("/") + ? url.slice(0, -1) + : url; + } + return url; + }, + _makeRequest({ + $ = this, + version = "v2", + path, + headers, + ...opts + }) { + return axios($, { + url: `${this._baseUrl(version)}${path}`, + headers: { + "Authorization": `Token ${this.$auth.token}`, + "Content-Type": "application/json", + ...headers, + }, + ...opts, + }); + }, + listBuckets(opts = {}) { + return this._makeRequest({ + path: "/buckets", + ...opts, + }); + }, + listScripts(opts = {}) { + return this._makeRequest({ + path: "/scripts", + ...opts, + }); + }, + listTasks(opts = {}) { + return this._makeRequest({ + path: "/tasks", + ...opts, + }); + }, + updateBucket({ + bucketId, ...opts + }) { + return this._makeRequest({ + method: "PATCH", + path: `/buckets/${bucketId}`, + ...opts, + }); + }, + writeData(opts = {}) { + return this._makeRequest({ + method: "POST", + path: "/write", + ...opts, + }); + }, + invokeScript({ + scriptId, ...opts + }) { + return this._makeRequest({ + method: "POST", + path: `/scripts/${scriptId}/invoke`, + ...opts, + }); + }, + async *paginate({ + fn, + resourceKey, + params, + max, + }) { + params = { + ...params, + limit: LIMIT, + offset: 0, + }; + let total, count = 0; + do { + const response = await fn({ + params, + }); + const results = resourceKey + ? response[resourceKey] + : response; + for (const item of results) { + yield item; + if (max && ++count >= max) { + return; + } + } + total = results?.length; + params.offset += params.limit; + } while (total); }, }, }; diff --git a/components/influxdb_cloud/package.json b/components/influxdb_cloud/package.json index 82eaad0fe91d6..7b217278de492 100644 --- a/components/influxdb_cloud/package.json +++ b/components/influxdb_cloud/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/influxdb_cloud", - "version": "0.6.0", + "version": "0.7.0", "description": "Pipedream influxdb_cloud Components", "main": "influxdb_cloud.app.mjs", "keywords": [ diff --git a/components/influxdb_cloud/sources/common/base.mjs b/components/influxdb_cloud/sources/common/base.mjs new file mode 100644 index 0000000000000..22fffc37aad34 --- /dev/null +++ b/components/influxdb_cloud/sources/common/base.mjs @@ -0,0 +1,89 @@ +import influxDbCloud from "../../influxdb_cloud.app.mjs"; +import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; + +export default { + props: { + influxDbCloud, + db: "$.service.db", + timer: { + type: "$.interface.timer", + default: { + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, + }, + }, + }, + methods: { + _getLastTs() { + return this.db.get("lastTs") || 0; + }, + _setLastTs(lastTs) { + this.db.set("lastTs", lastTs); + }, + getParams() { + return {}; + }, + getTsField() { + return "createdAt"; + }, + generateMeta(item) { + const ts = Date.parse(item[this.getTsField()]); + return { + id: `${item.id}${ts}`, + summary: this.getSummary(item), + ts, + }; + }, + async processEvent(max) { + const lastTs = this._getLastTs(); + let maxTs = lastTs; + + const fn = this.getResourceFn(); + const params = this.getParams(); + const resourceKey = this.getResourceKey(); + const tsField = this.getTsField(); + + const results = this.influxDbCloud.paginate({ + fn, + params, + resourceKey, + }); + + let items = []; + for await (const item of results) { + const ts = Date.parse(item[tsField]); + if (ts > lastTs) { + items.push(item); + maxTs = Math.max(ts, maxTs); + } + } + + if (max && items?.length > max) { + items = items.slice(-1 * max); + } + + items.forEach((item) => { + const meta = this.generateMeta(item); + this.$emit(item, meta); + }); + + this._setLastTs(maxTs); + }, + getResourceFn() { + throw new Error("getResourceFn is not implemented"); + }, + getResourceKey() { + throw new Error("getResourceKey is not implemented"); + }, + getSummary() { + throw new Error("getSummary is not implemented"); + }, + }, + hooks: { + async deploy() { + await this.processEvent(25); + }, + }, + async run() { + await this.processEvent(); + }, +}; diff --git a/components/influxdb_cloud/sources/new-bucket-created/new-bucket-created.mjs b/components/influxdb_cloud/sources/new-bucket-created/new-bucket-created.mjs new file mode 100644 index 0000000000000..d9836a7107e3e --- /dev/null +++ b/components/influxdb_cloud/sources/new-bucket-created/new-bucket-created.mjs @@ -0,0 +1,23 @@ +import common from "../common/base.mjs"; + +export default { + ...common, + key: "influxdb_cloud-new-bucket-created", + name: "New Bucket Created", + description: "Emit new event when a new bucket is created. [See the documentation](https://docs.influxdata.com/influxdb3/cloud-serverless/api/v2/#operation/GetBuckets)", + version: "0.0.1", + type: "source", + dedupe: "unique", + methods: { + ...common.methods, + getResourceFn() { + return this.influxDbCloud.listBuckets; + }, + getResourceKey() { + return "buckets"; + }, + getSummary(item) { + return `New Bucket Created with ID: ${item.id}`; + }, + }, +}; diff --git a/components/influxdb_cloud/sources/new-script-created/new-script-created.mjs b/components/influxdb_cloud/sources/new-script-created/new-script-created.mjs new file mode 100644 index 0000000000000..69aa58f72855c --- /dev/null +++ b/components/influxdb_cloud/sources/new-script-created/new-script-created.mjs @@ -0,0 +1,23 @@ +import common from "../common/base.mjs"; + +export default { + ...common, + key: "influxdb_cloud-new-script-created", + name: "New Script Created", + description: "Emit new event when a new script is created. [See the documentation](https://docs.influxdata.com/influxdb3/cloud-serverless/api/v2/#operation/GetScripts)", + version: "0.0.1", + type: "source", + dedupe: "unique", + methods: { + ...common.methods, + getResourceFn() { + return this.influxDbCloud.listScripts; + }, + getResourceKey() { + return "scripts"; + }, + getSummary(item) { + return `New Script Created with ID: ${item.id}`; + }, + }, +}; diff --git a/components/influxdb_cloud/sources/new-task-completed/new-task-completed.mjs b/components/influxdb_cloud/sources/new-task-completed/new-task-completed.mjs new file mode 100644 index 0000000000000..b2ff2e227c513 --- /dev/null +++ b/components/influxdb_cloud/sources/new-task-completed/new-task-completed.mjs @@ -0,0 +1,26 @@ +import common from "../common/base.mjs"; + +export default { + ...common, + key: "influxdb_cloud-new-task-completed", + name: "New Task Completed", + description: "Emit new event when a new task is completed. [See the documentation](https://docs.influxdata.com/influxdb3/cloud-serverless/api/v2/#operation/GetTasks)", + version: "0.0.1", + type: "source", + dedupe: "unique", + methods: { + ...common.methods, + getResourceFn() { + return this.influxDbCloud.listTasks; + }, + getResourceKey() { + return "tasks"; + }, + getTsField() { + return "latestCompleted"; + }, + getSummary(item) { + return `Task ${item.name} Completed`; + }, + }, +};