Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/google_calendar/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/google_calendar",
"version": "0.5.11",
"version": "0.5.12",
"description": "Pipedream Google_calendar Components",
"main": "google_calendar.app.mjs",
"keywords": [
Expand All @@ -11,7 +11,7 @@
"author": "Pipedream <support@pipedream.com> (https://pipedream.com/)",
"dependencies": {
"@googleapis/calendar": "^1.0.2",
"@pipedream/platform": "^3.0.0",
"@pipedream/platform": "^3.1.0",
"color-2-name": "^1.4.4",
"lodash.get": "^4.4.2",
"moment-timezone": "^0.5.33",
Expand Down
147 changes: 147 additions & 0 deletions components/google_calendar/sources/common/taskScheduler.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { axios } from "@pipedream/platform";
import { v4 as uuid } from "uuid";

export default {
methods: {
async _makeAPIRequest({
$ = this, apiKey, ...opts
}) {
if (!opts.headers) opts.headers = {};
opts.headers["Authorization"] = `Bearer ${apiKey}`;
opts.headers["Content-Type"] = "application/json";
opts.headers["user-agent"] = "@PipedreamHQ/pipedream v0.1";
const { path } = opts;
delete opts.path;
opts.url = `https://api.pipedream.com/v1${path[0] === "/"
? ""
: "/"
}${path}`;
return axios($, opts);
},
selfChannel() {
return "self";
},
queuedEventsChannel() {
return "$in";
},
async subscribe(emitter_id, listener_id, event_name = null, apiKey) {
let params = {
emitter_id,
listener_id,
};
if (event_name) {
params.event_name = event_name;
}
return await this._makeAPIRequest({
method: "POST",
path: "/subscriptions",
params,
apiKey,
});
},
async selfSubscribe(apiKey) {
const isSubscribedToSelf = this.db.get("isSubscribedToSelf");
if (!isSubscribedToSelf) {
const componentId = process.env.PD_COMPONENT;
const selfChannel = this.selfChannel();
console.log(`Subscribing to ${selfChannel} channel for event source`);
console.log(
await this.subscribe(componentId, componentId, selfChannel, apiKey),
);
this.db.set("isSubscribedToSelf", true);
}
},
emitScheduleEvent(event, timestamp) {
const selfChannel = this.selfChannel();
const epoch = Date.parse(timestamp);
const $id = uuid();

console.log(`Scheduled event to emit on: ${new Date(epoch)}`);

this.$emit(
{
...event,
$channel: selfChannel,
$id,
},
{
name: selfChannel,
id: $id,
delivery_ts: epoch,
},
);

return $id;
},
async deleteScheduledEvent(event, apiKey) {
const componentId = process.env.PD_COMPONENT;
const inChannel = this.queuedEventsChannel();

// The user must pass a scheduled event UUID they'd like to cancel
// We lookup the event by ID and delete it
const { id } = event.body;

// List events in the $in channel - the queue of scheduled events, to be emitted in the future
const events = await this.listEvents(
componentId,
inChannel,
apiKey,
);
console.log("Events: ", events);

// Find the event in the list by id
const eventToCancel = events.data.find((e) => {
const { metadata } = e;
return metadata.id === id;
});

console.log("Event to cancel: ", eventToCancel);

if (!eventToCancel) {
console.log(`No event with ${id} found`);
return false;
}

// Delete the event
await this.deleteEvent(
componentId,
eventToCancel.id,
inChannel,
apiKey,
);
return true;
},
async listEvents(dcID, event_name, apiKey) {
return await this._makeAPIRequest({
path: `/sources/${dcID}/event_summaries`,
params: {
event_name,
},
apiKey,
});
},
async deleteEvent(dcID, eventID, event_name, apiKey) {
return await this._makeAPIRequest({
method: "DELETE",
path: `/sources/${dcID}/events`,
params: {
start_id: eventID,
end_id: eventID,
event_name,
},
apiKey,
});
},
emitEvent(event, summary) {
const id = event.$id;
delete event.$channel;
delete event.$id;

this.$emit(event, {
summary: summary ?? JSON.stringify(event),
id,
ts: +new Date(),
});
},
},
};
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import taskScheduler from "../../../pipedream/sources/new-scheduled-tasks/new-scheduled-tasks.mjs";
import googleCalendar from "../../google_calendar.app.mjs";
import taskScheduler from "../common/taskScheduler.mjs";
import sampleEmit from "./test-event.mjs";

export default {
key: "google_calendar-upcoming-event-alert",
name: "New Upcoming Event Alert",
description: `Emit new event based on a time interval before an upcoming event in the calendar. This source uses Pipedream's Task Scheduler.
[See the documentation](https://pipedream.com/docs/examples/waiting-to-execute-next-step-of-workflow/#step-1-create-a-task-scheduler-event-source)
for more information and instructions for connecting your Pipedream account.`,
version: "0.0.10",
description: "Emit new event based on a time interval before an upcoming event in the calendar.",
version: "0.1.0",
type: "source",
props: {
pipedream: taskScheduler.props.pipedream,
googleCalendar,
db: "$.service.db",
http: "$.interface.http",
pipedreamApiKey: {
type: "string",
label: "Pipedream API Key",
description: "[Click here to find your Pipedream API key](https://pipedream.com/settings/user)",
secret: true,
},
calendarId: {
propDefinition: [
googleCalendar,
Expand Down Expand Up @@ -55,11 +58,11 @@ export default {
return;
}
for (const id of ids) {
if (await this.deleteEvent({
if (await this.deleteScheduledEvent({
body: {
id,
},
})) {
}, this.pipedreamApiKey)) {
console.log("Cancelled scheduled event");
}
}
Expand Down Expand Up @@ -112,7 +115,7 @@ export default {
async run(event) {
// self subscribe only on the first time
if (!this._hasDeployed()) {
await this.selfSubscribe();
await this.selfSubscribe(this.pipedreamApiKey);
}

const scheduledEventIds = this._getScheduledEventIds() || [];
Expand All @@ -121,7 +124,9 @@ export default {
if (event.$channel === this.selfChannel()) {
const remainingScheduledEventIds = scheduledEventIds.filter((id) => id !== event["$id"]);
this._setScheduledEventIds(remainingScheduledEventIds);
this.emitEvent(event, `Upcoming ${event.summary} event`);
this.emitEvent(event, `Upcoming ${event.summary
? event.summary + " "
: ""}event`);
return;
}

Expand Down
Loading
Loading