Skip to content

Conversation

choeqq
Copy link
Contributor

@choeqq choeqq commented Aug 27, 2025

Closes #18193

WHY

HubSpot users need a way to trigger workflows specifically when contacts are added to a list, without being overwhelmed by general contact property updates. The existing "New or Updated Contact" trigger fires on any contact change, creating noise when users only care about list membership events.

WHAT

Added a new HubSpot source: "New Contact Added to List"

Key Features:

  • Monitors specific HubSpot lists for new contact additions
  • Uses the /crm/v3/lists/{listId}/memberships/join-order endpoint for efficient polling
  • Ignores contact property updates (only triggers on actual list membership changes)
  • Supports a single list in a single trigger configuration
  • First run will not process any current list members, subsequent runs only detect new additions

Summary by CodeRabbit

  • New Features
    • New HubSpot source that emits enriched events when a contact is added to a selected list (includes contact details, membership info, and timestamps). It pages through memberships, persists per-list progress, and improves reliability when fetching membership and contact data.
    • New public API to fetch list memberships in join-order to support the source.
  • Tests
    • Added a sample “new contact added to list” event payload for validation.
  • Chores
    • Bumped HubSpot package version to 1.7.0.

Copy link
Contributor

coderabbitai bot commented Aug 27, 2025

Caution

Review failed

An error occurred during the review process. Please try again later.

Walkthrough

Adds a HubSpot app method to fetch list memberships by join order (with retry logic), updates batch object fetching to retry on 429, introduces a new polling source that emits events when contacts are added to a configured HubSpot list, adds a static test-event fixture, and bumps the HubSpot package version from 1.6.2 to 1.7.0.

Changes

Cohort / File(s) Summary
HubSpot app methods
components/hubspot/hubspot.app.mjs
Adds methods.getListMembershipsByJoinOrder({ listId, ...opts }) calling /crm/v3/lists/${listId}/memberships/join-order and updates batchGetObjects to async with retry logic (MAX_RETRIES=5, BASE_RETRY_DELAY=500ms) on 429 responses using exponential backoff + jitter.
New source: contact added to list
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs
Adds hubspot-new-contact-added-to-list polling source: per-list state keys list_<listId>_last_timestamp, pages join-order memberships, filters by timestamp, batches contact detail fetches (chunks of 100), and emits events for newly added memberships.
Test fixture
components/hubspot/sources/new-contact-added-to-list/test-event.mjs
Adds static test event payload representing a “new contact added to list” membership.
Package version bump
components/hubspot/package.json
Updates version from 1.6.2 to 1.7.0.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User as User / Config
  participant Src as Source: New Contact Added to List
  participant DB as DB (per-list state)
  participant HSLists as HubSpot Lists API
  participant HSContacts as HubSpot Contacts API
  participant Evt as Event Emitter
  rect rgba(100,150,240,0.06)
  User->>Src: configure listId & properties
  end
  loop scheduled poll
    Src->>DB: read list_<id>_last_timestamp & paging token
    Src->>HSLists: GET /crm/v3/lists/{listId}/memberships/join-order?limit=...&after=...
    HSLists-->>Src: memberships page (+ paging.next.after)
    alt first run (no last timestamp)
      Src->>DB: set baseline timestamp (no emits)
    else new memberships returned
      Src->>HSContacts: batch POST /objects/contacts/batch/read (chunks of 100)
      HSContacts-->>Src: contact details map
      Src->>Evt: emit event per new membership (listId, contact, membership, addedAt)
      Src->>DB: update list_<id>_last_timestamp & paging token
    end
    opt more pages
      Src->>HSLists: GET next page (after=token)
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Assessment against linked issues

Objective Addressed Explanation
Implement trigger: HubSpot contact added to list (#18193)
Use memberships join-order API for detection (#18193)
Trigger should ignore contact property updates and only respond to membership changes (#18193)
Include API docs reference in trigger description (#18193)

Poem

I hop through lists and sniff each join-order,
I tally new faces—no property fodder.
I fetch in batches, jitter and pause,
I stamp each membership with careful cause.
A rabbit’s small job, gentle as air. 🥕🐇

Warning

Tools execution failed with the following error:

Failed to run tools: 8 RESOURCE_EXHAUSTED: Received message larger than max (6004661 vs 4194304)

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

vercel bot commented Aug 27, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Preview Comments Updated (UTC)
pipedream-docs-redirect-do-not-edit Ignored Ignored Sep 8, 2025 4:42pm

@adolfo-pd adolfo-pd added the User submitted Submitted by a user label Aug 27, 2025
@pipedream-component-development
Copy link
Collaborator

Thank you so much for submitting this! We've added it to our backlog to review, and our team has been notified.

@pipedream-component-development
Copy link
Collaborator

Thanks for submitting this PR! When we review PRs, we follow the Pipedream component guidelines. If you're not familiar, here's a quick checklist:

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
components/hubspot/sources/new-contact-added-to-list/test-event.mjs (1)

39-43: Align fixture with real API shape: include membershipTimestamp and use it for addedAt.

The join-order API returns a timestamp per membership (e.g., membershipTimestamp). Including it improves parity and lets the source use a real join time.

Apply this diff:

   membership: {
-    recordId: "31612976545",
+    recordId: "31612976545",
+    membershipTimestamp: "2024-08-26T15:35:20.789Z",
   },
-  addedAt: "2024-08-26T15:35:20.789Z",
+  addedAt: "2024-08-26T15:35:20.789Z",
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (4)

61-66: Use the API-provided timestamp for meta.ts.

Prefer membership.membershipTimestamp (join time) over Date.now() to reflect accurate event timing.

Apply this diff:

-    generateMeta(membership, listInfo) {
-      const { recordId } = membership;
-      const ts = this.getTs();
+    generateMeta(membership, listInfo) {
+      const { recordId, membershipTimestamp } = membership;
+      const ts = membershipTimestamp
+        ? new Date(membershipTimestamp).getTime()
+        : this.getTs();

160-175: Support labeled list selections and avoid hardcoded list names.

When prop uses withLabel, lists may be objects with value/label. Use label if present, fallback to “List {id}”.

Apply this diff:

-      for (const listId of lists) {
+      for (const list of lists) {
         try {
-          const listInfo = {
-            listId,
-            name: `List ${listId}`,
-          };
+          const listId = typeof list === "string" ? list : list?.value;
+          const listName = typeof list === "object" && list?.label
+            ? list.label
+            : `List ${listId}`;
+          const listInfo = { listId, name: listName };
 
           const newMemberships = await this.processListMemberships(
-            listId,
+            listId,
             listInfo,
           );
           allNewMemberships.push(...newMemberships);
         } catch (error) {
-          console.error(`Error processing list ${listId}:`, error);
+          const idForLog = typeof list === "string" ? list : list?.value;
+          console.error(`Error processing list ${idForLog}:`, error);
         }
       }

177-200: Emit accurate addedAt and use the same timestamp for meta.ts.

Use membership.membershipTimestamp when present so events reflect the true join time from HubSpot. The change to generateMeta (above) will pick this up for ts as well.

Apply this diff:

-        for (const {
-          membership, listInfo,
-        } of allNewMemberships) {
+        for (const { membership, listInfo } of allNewMemberships) {
           const contactDetail = contactDetails[membership.recordId] || {};
 
           const eventData = {
             listId: listInfo.listId,
             listName: listInfo.name,
             contactId: membership.recordId,
             contact: contactDetail,
             membership,
-            addedAt: new Date().toISOString(),
+            addedAt: membership.membershipTimestamp || new Date().toISOString(),
           };
 
           const meta = this.generateMeta(membership, listInfo);
           this.$emit(eventData, meta);
         }

70-75: Minor: de-duplicate properties to avoid duplicates in request payload.

Use a Set to guard against overlap between default properties and user-specified ones (addressing accidental dupes).

Apply this diff:

-      const allProperties = [
-        ...DEFAULT_CONTACT_PROPERTIES,
-        ...properties,
-      ];
+      const allProperties = Array.from(new Set([
+        ...DEFAULT_CONTACT_PROPERTIES,
+        ...properties,
+      ]));
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4019317 and ee9b575.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (4)
  • components/hubspot/hubspot.app.mjs (1 hunks)
  • components/hubspot/package.json (1 hunks)
  • components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1 hunks)
  • components/hubspot/sources/new-contact-added-to-list/test-event.mjs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/hubspot/hubspot.app.mjs (1)
components/hubspot/common/constants.mjs (1)
  • API_PATH (9-28)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)
components/hubspot/common/constants.mjs (1)
  • DEFAULT_CONTACT_PROPERTIES (41-68)
components/hubspot/hubspot.app.mjs (8)
  • properties (224-224)
  • properties (243-245)
  • properties (264-266)
  • properties (285-287)
  • properties (306-308)
  • properties (327-329)
  • params (840-842)
  • DEFAULT_LIMIT (35-41)
🔇 Additional comments (2)
components/hubspot/package.json (1)

3-3: Version bump looks good.

1.7.0 aligns with the new source and app method addition. Remember to update release notes/changelog accordingly.

components/hubspot/hubspot.app.mjs (1)

1071-1079: New join-order API wrapper is correct and minimal.

Endpoint path and option forwarding look right; default GET via makeRequest is appropriate. No further concerns.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1)

145-181: Fix duplicate emissions when re-reading the tail page.

Current logic only skips the single lastRecordId, but still re-emits older items on the same page when no new data exists. Resume only after the last processed record to avoid duplicates.

Apply:

       try {
         let hasMore = true;
-        let latestAfterToken = afterToken;
+        let latestAfterToken = afterToken;
         let latestRecordId = lastRecordId;
+        let lastSeenAfterToken = afterToken;
+        let started = !lastRecordId; // if no state, process all
 
         while (hasMore) {
           const {
             results, paging,
           } =
             await this.hubspot.getListMembershipsByJoinOrder({
               listId,
               params,
             });
 
           if (!results || results.length === 0) {
             break;
           }
 
-          for (const membership of results) {
-            if (lastRecordId && membership.recordId === lastRecordId) {
-              continue;
-            }
-
-            newMemberships.push({
-              membership,
-              listInfo,
-            });
-            latestRecordId = membership.recordId;
-          }
+          for (const membership of results) {
+            if (!started) {
+              if (membership.recordId === lastRecordId) {
+                started = true;
+              }
+              continue; // skip until we pass the last processed one
+            }
+            newMemberships.push({ membership, listInfo });
+            latestRecordId = membership.recordId;
+          }
 
           if (paging?.next?.after) {
-            latestAfterToken = paging.next.after;
-            params.after = paging.next.after;
+            latestAfterToken = paging.next.after;
+            lastSeenAfterToken = paging.next.after;
+            params.after = paging.next.after;
           } else {
             hasMore = false;
           }
         }
 
-        if (latestAfterToken !== afterToken) {
-          this._setAfterToken(listId, latestAfterToken);
-        }
+        // keep the last valid token even when we hit the end
+        if (lastSeenAfterToken) this._setAfterToken(listId, lastSeenAfterToken);
         if (latestRecordId) {
           this._setLastRecordId(listId, latestRecordId);
         }
🧹 Nitpick comments (3)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (3)

98-129: Avoid bursty batch reads; process chunks sequentially to respect rate limits.

Parallel Promise.all across many chunks can trigger 429s. Sequentially process chunks; retain per-chunk error handling.

Apply:

-      const chunkPromises = chunks.map(async (chunk) => {
-        try {
-          const { results } = await this.hubspot.batchGetObjects({
-            objectType: "contacts",
-            data: {
-              inputs: chunk.map((id) => ({
-                id,
-              })),
-              properties: allProperties,
-            },
-          });
-          return results;
-        } catch (error) {
-          console.warn("Error fetching contact details for chunk:", error);
-          return [];
-        }
-      });
-
-      try {
-        const chunkResults = await Promise.all(chunkPromises);
-
-        chunkResults.forEach((results) => {
-          results.forEach((contact) => {
-            contactMap[contact.id] = contact;
-          });
-        });
-
-        return contactMap;
-      } catch (error) {
-        console.warn("Error processing contact details:", error);
-        return {};
-      }
+      try {
+        for (const chunk of chunks) {
+          try {
+            const { results = [] } = await this.hubspot.batchGetObjects({
+              objectType: "contacts",
+              data: {
+                inputs: chunk.map((id) => ({ id })),
+                properties: allProperties,
+              },
+            });
+            for (const contact of results) {
+              contactMap[contact.id] = contact;
+            }
+          } catch (error) {
+            console.warn("Error fetching contact details for chunk:", error);
+          }
+        }
+        return contactMap;
+      } catch (error) {
+        console.warn("Error processing contact details:", error);
+        return {};
+      }

227-243: Emit the correct event time and keep meta/data in sync.

Use the same timestamp as meta.ts (from membership join time) in eventData.addedAt.

Apply:

-        for (const {
-          membership, listInfo,
-        } of allNewMemberships) {
-          const contactDetail = contactDetails[membership.recordId] || {};
-
-          const eventData = {
-            listId: listInfo.listId,
-            listName: listInfo.name,
-            contactId: membership.recordId,
-            contact: contactDetail,
-            membership,
-            addedAt: new Date().toISOString(),
-          };
-
-          const meta = this.generateMeta(membership, listInfo);
-          this.$emit(eventData, meta);
-        }
+        for (const { membership, listInfo } of allNewMemberships) {
+          const contactDetail = contactDetails[membership.recordId] || {};
+          const meta = this.generateMeta(membership, listInfo);
+          const eventData = {
+            listId: listInfo.listId,
+            listName: listInfo.name,
+            contactId: membership.recordId,
+            contact: contactDetail,
+            membership,
+            addedAt: new Date(meta.ts).toISOString(),
+          };
+          this.$emit(eventData, meta);
+        }

206-210: Consider fetching real list names for cleaner summaries.

name: "List <id>" is a placeholder. Optionally fetch list details once per run to populate listInfo.name from HubSpot and improve event summaries.

Would you like a follow-up patch adding a lightweight getListDetails(listId) app method and caching names?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ee9b575 and a3c7889.

📒 Files selected for processing (1)
  • components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)
components/hubspot/common/constants.mjs (1)
  • DEFAULT_CONTACT_PROPERTIES (41-68)
components/hubspot/hubspot.app.mjs (7)
  • properties (224-224)
  • properties (243-245)
  • properties (264-266)
  • properties (285-287)
  • properties (306-308)
  • properties (327-329)
  • params (840-842)
🔇 Additional comments (2)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)

45-61: Good: paging token and last record tracking are persisted per-list.

Storing list_${listId}_after_token and list_${listId}_last_record_id is aligned with the join-order API’s opaque cursor model.


65-74: Generated shell scripts to verify the membership fields and generateMeta implementation.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)

47-54: Persist HubSpot paging token and last processed recordId; timestamp-only state will miss/duplicate events and force full scans

The join-order endpoint requires carrying forward the opaque paging token and a last processed recordId. Relying only on a timestamp risks gaps when multiple records share the same timestamp and forces re-reading from the head of the list on every run.

Apply this diff to add cursor state helpers:

   methods: {
     ...common.methods,
     _getLastMembershipTimestamp(listId) {
       const key = `list_${listId}_last_timestamp`;
       return this.db.get(key);
     },
     _setLastMembershipTimestamp(listId, timestamp) {
       const key = `list_${listId}_last_timestamp`;
       this.db.set(key, timestamp);
     },
+    _getLastProcessedRecord(listId) {
+      const key = `list_${listId}_last_record_id`;
+      return this.db.get(key);
+    },
+    _setLastProcessedRecord(listId, recordId) {
+      const key = `list_${listId}_last_record_id`;
+      this.db.set(key, recordId);
+    },
+    _getLastPagingToken(listId) {
+      const key = `list_${listId}_after_token`;
+      return this.db.get(key);
+    },
+    _setLastPagingToken(listId, token) {
+      if (!token) return;
+      const key = `list_${listId}_after_token`;
+      this.db.set(key, token);
+    },

124-141: Fix first-run behavior (should emit all), use paging token + last recordId for efficient incremental reads, and avoid fragile timestamp filter

Currently, first run returns nothing (baseline to “now”), which conflicts with the PR objective to emit all existing list members on the first run. Also, filtering by membershipTimestamp “> last” can drop events with identical timestamps. Use the stored after-token and lastRecordId to resume safely and efficiently.

Apply this diff to rework processListMemberships:

-    async processListMemberships(listId, listInfo) {
-      const lastMembershipTimestamp = this._getLastMembershipTimestamp(listId);
-      const newMemberships = [];
-
-      let params = {
-        limit: DEFAULT_LIMIT,
-      };
-
-      try {
-        let hasMore = true;
-        let latestMembershipTimestamp = lastMembershipTimestamp;
-
-        if (!lastMembershipTimestamp) {
-          const baselineTimestamp = new Date().toISOString();
-          this._setLastMembershipTimestamp(listId, baselineTimestamp);
-          return newMemberships;
-        }
-
-        while (hasMore) {
-          const {
-            results, paging,
-          } =
-            await this.hubspot.getListMembershipsByJoinOrder({
-              listId,
-              params,
-            });
-
-          if (!results || results.length === 0) {
-            break;
-          }
-
-          for (const membership of results) {
-            const { membershipTimestamp } = membership;
-
-            if (
-              new Date(membershipTimestamp) > new Date(lastMembershipTimestamp)
-            ) {
-              newMemberships.push({
-                membership,
-                listInfo,
-              });
-
-              if (
-                !latestMembershipTimestamp ||
-                new Date(membershipTimestamp) >
-                  new Date(latestMembershipTimestamp)
-              ) {
-                latestMembershipTimestamp = membershipTimestamp;
-              }
-            }
-          }
-
-          if (paging?.next?.after) {
-            params.after = paging.next.after;
-          } else {
-            hasMore = false;
-          }
-        }
-
-        if (latestMembershipTimestamp !== lastMembershipTimestamp) {
-          this._setLastMembershipTimestamp(listId, latestMembershipTimestamp);
-        }
-      } catch (error) {
-        console.error(`Error processing list ${listId}:`, error);
-      }
-
-      return newMemberships;
-    },
+    async processListMemberships(listId, listInfo) {
+      const lastRecordId = this._getLastProcessedRecord(listId);
+      const lastAfterToken = this._getLastPagingToken(listId);
+      const newMemberships = [];
+
+      const params = { limit: DEFAULT_LIMIT };
+      if (lastAfterToken) params.after = lastAfterToken;
+
+      try {
+        let hasMore = true;
+        let latestRecordId = lastRecordId;
+        let lastSeenAfterToken = lastAfterToken;
+        // If first run (no lastRecordId), process all results. Otherwise resume
+        // from the stored page and skip until we hit lastRecordId.
+        let started = !lastRecordId;
+
+        while (hasMore) {
+          const { results, paging } =
+            await this.hubspot.getListMembershipsByJoinOrder({ listId, params });
+
+          if (!results || results.length === 0) break;
+
+          for (const membership of results) {
+            if (!started) {
+              if (membership.recordId === lastRecordId) {
+                started = true;
+                continue; // resume after the last processed one
+              }
+              continue; // still skipping prior items on tail page
+            }
+            newMemberships.push({ membership, listInfo });
+            latestRecordId = membership.recordId;
+          }
+
+          if (paging?.next?.after) {
+            params.after = paging.next.after;
+            lastSeenAfterToken = paging.next.after;
+          } else {
+            hasMore = false;
+          }
+        }
+
+        if (latestRecordId) this._setLastProcessedRecord(listId, latestRecordId);
+        // Keep the last valid token to resume near the tail on the next run.
+        if (lastSeenAfterToken) this._setLastPagingToken(listId, lastSeenAfterToken);
+      } catch (error) {
+        console.error(`Error processing list ${listId}:`, error);
+      }
+      return newMemberships;
+    },

Also applies to: 155-185

🧹 Nitpick comments (4)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (4)

222-229: Emit the actual “added to list” time in the event payload

Use the membership’s timestamp instead of the processing time.

-              addedAt: new Date().toISOString(),
+              addedAt: membership.membershipTimestamp || new Date().toISOString(),

91-122: Guard against HubSpot 429s: avoid unbounded parallel batchGetObjects

Promise.all over many chunks can trip rate limits on large lists. Iterate sequentially or cap concurrency.

Simple sequential alternative:

-      const chunkPromises = chunks.map(async (chunk) => {
-        try {
-          const { results } = await this.hubspot.batchGetObjects({
-            objectType: "contacts",
-            data: {
-              inputs: chunk.map((id) => ({
-                id,
-              })),
-              properties: allProperties,
-            },
-          });
-          return results;
-        } catch (error) {
-          console.warn("Error fetching contact details for chunk:", error);
-          return [];
-        }
-      });
-
-      try {
-        const chunkResults = await Promise.all(chunkPromises);
-
-        chunkResults.forEach((results) => {
-          results.forEach((contact) => {
-            contactMap[contact.id] = contact;
-          });
-        });
+      try {
+        for (const chunk of chunks) {
+          try {
+            const { results = [] } = await this.hubspot.batchGetObjects({
+              objectType: "contacts",
+              data: {
+                inputs: chunk.map((id) => ({ id })),
+                properties: allProperties,
+              },
+            });
+            for (const contact of results) {
+              contactMap[contact.id] = contact;
+            }
+          } catch (error) {
+            console.warn("Error fetching contact details for chunk:", error);
+          }
+        }

200-203: Optional: Enrich list name instead of “List {listId}” placeholder

Consider fetching the list name once (and caching) to improve event summaries.

I can add a small helper calling Lists API and memoize per run.


239-241: Remove unused method

getParams is unused.

-    getParams() {
-      return {};
-    },
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a3c7889 and 94d42f7.

📒 Files selected for processing (1)
  • components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1)
components/hubspot/common/constants.mjs (1)
  • DEFAULT_CONTACT_PROPERTIES (41-68)
🔇 Additional comments (1)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1)

24-43: Add multi-list support with backward compatibility

  • Introduce listIds (type string[]) alongside listId, falling back to listId when listIds is unset.
  • In processResults, build a list of IDs (this.listIds || [this.listId]), then loop through each list to fetch and emit new memberships.
  • Confirm that propDefinition can render a multi-select string[]; otherwise switch listIds to an async-options array prop.

Also apply the same changes in lines 192–238.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (3)

72-101: Batch read: de-duplicate IDs/properties and chunk to ≤100 per request

Current code can exceed HubSpot batch limits and send duplicate props. Chunk to 100 and de-dupe before calling batchGetObjects. Docs and community confirm 100-object batch limit for CRM object endpoints. (community.hubspot.com)

-    async getContactDetails(contactIds) {
-      if (!contactIds.length) return {};
-
-      const { properties = [] } = this;
-      const allProperties = [
-        ...DEFAULT_CONTACT_PROPERTIES,
-        ...properties,
-      ];
-
-      try {
-        const { results } = await this.hubspot.batchGetObjects({
-          objectType: "contacts",
-          data: {
-            inputs: contactIds.map((id) => ({
-              id,
-            })),
-            properties: allProperties,
-          },
-        });
-
-        const contactMap = {};
-        results.forEach((contact) => {
-          contactMap[contact.id] = contact;
-        });
-        return contactMap;
-      } catch (error) {
-        console.warn("Error fetching contact details:", error);
-        return {};
-      }
-    },
+    async getContactDetails(contactIds) {
+      if (!contactIds.length) return {};
+      const ids = Array.from(new Set(contactIds));
+      const { properties = [] } = this;
+      const allProperties = Array.from(new Set([
+        ...DEFAULT_CONTACT_PROPERTIES,
+        ...properties,
+      ]));
+      const contactMap = {};
+      try {
+        const BATCH_LIMIT = 100; // per HubSpot CRM object batch limits
+        for (let i = 0; i < ids.length; i += BATCH_LIMIT) {
+          const batch = ids.slice(i, i + BATCH_LIMIT);
+          const { results = [] } = await this.hubspot.batchGetObjects({
+            objectType: "contacts",
+            data: {
+              inputs: batch.map((id) => ({ id })),
+              properties: allProperties,
+            },
+          });
+          for (const contact of results) {
+            contactMap[contact.id] = contact;
+          }
+        }
+        return contactMap;
+      } catch (error) {
+        console.warn("Error fetching contact details:", error);
+        return {};
+      }
+    },

47-54: Persist paging token and last recordId; timestamp-only cursor risks misses/dupes

Per the join-order endpoint, you should persist paging.next.after (opaque token) and also the last processed recordId to safely resume and skip re-reads. Relying only on timestamps can drop items with identical membershipTimestamp or cause duplicates across runs. Docs describe after/before paging; see also reports of ordering inconsistencies earlier this year. (postman.com, community.hubspot.com)

     _setLastMembershipTimestamp(listId, timestamp) {
       const key = `list_${listId}_last_timestamp`;
       this.db.set(key, timestamp);
     },
+    _getLastProcessedRecord(listId) {
+      const key = `list_${listId}_last_record_id`;
+      return this.db.get(key);
+    },
+    _setLastProcessedRecord(listId, recordId) {
+      if (!recordId) return;
+      const key = `list_${listId}_last_record_id`;
+      this.db.set(key, recordId);
+    },
+    _getLastPagingToken(listId) {
+      const key = `list_${listId}_after_token`;
+      return this.db.get(key);
+    },
+    _setLastPagingToken(listId, token) {
+      if (!token) return;
+      const key = `list_${listId}_after_token`;
+      this.db.set(key, token);
+    },

102-169: Use stored paging token and recordId to resume; update both after each run

Initialize from stored after token and last recordId, skip until the last processed record, and persist the newest after + recordId. This avoids rescanning the entire list and prevents gaps when multiple joins share the same timestamp. Docs: max limit 250; paging is via after/before. (postman.com)

-    async processListMemberships(listId, listInfo) {
-      const lastMembershipTimestamp = this._getLastMembershipTimestamp(listId);
-      const newMemberships = [];
-
-      let params = {
-        limit: DEFAULT_LIMIT,
-      };
+    async processListMemberships(listId, listInfo) {
+      const lastMembershipTimestamp = this._getLastMembershipTimestamp(listId);
+      const lastAfterToken = this._getLastPagingToken(listId);
+      const lastProcessedRecordId = this._getLastProcessedRecord(listId);
+      const newMemberships = [];
+
+      let params = {
+        limit: DEFAULT_LIMIT,
+      };
+      if (lastAfterToken) params.after = lastAfterToken;
@@
-        let hasMore = true;
-        let latestMembershipTimestamp = lastMembershipTimestamp;
+        let hasMore = true;
+        let latestMembershipTimestamp = lastMembershipTimestamp;
+        let latestRecordId = lastProcessedRecordId;
+        let lastSeenAfterToken = lastAfterToken;
+        let started = !lastProcessedRecordId; // skip until we pass the last processed record
@@
-        while (hasMore) {
+        while (hasMore) {
           const {
             results, paging,
           } =
             await this.hubspot.getListMembershipsByJoinOrder({
               listId,
               params,
             });
@@
-          for (const membership of results) {
-            const { membershipTimestamp } = membership;
+          for (const membership of results) {
+            const { membershipTimestamp, recordId } = membership;
+
+            if (!started) {
+              if (recordId === lastProcessedRecordId) {
+                started = true;
+                continue; // resume after this one
+              }
+              continue; // still skipping
+            }
 
             if (
               new Date(membershipTimestamp) > new Date(lastMembershipTimestamp)
             ) {
               newMemberships.push({
                 membership,
                 listInfo,
               });
 
               if (
                 !latestMembershipTimestamp ||
                 new Date(membershipTimestamp) >
                   new Date(latestMembershipTimestamp)
               ) {
                 latestMembershipTimestamp = membershipTimestamp;
               }
+              latestRecordId = recordId;
             }
           }
 
           if (paging?.next?.after) {
-            params.after = paging.next.after;
+            params.after = paging.next.after;
+            lastSeenAfterToken = paging.next.after;
           } else {
             hasMore = false;
           }
         }
 
-        if (latestMembershipTimestamp !== lastMembershipTimestamp) {
-          this._setLastMembershipTimestamp(listId, latestMembershipTimestamp);
-        }
+        if (latestMembershipTimestamp && latestMembershipTimestamp !== lastMembershipTimestamp) {
+          this._setLastMembershipTimestamp(listId, latestMembershipTimestamp);
+        }
+        if (latestRecordId) this._setLastProcessedRecord(listId, latestRecordId);
+        if (lastSeenAfterToken) this._setLastPagingToken(listId, lastSeenAfterToken);
🧹 Nitpick comments (5)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (5)

62-64: Harden timestamp parsing to avoid NaN in meta.ts

Use Date.parse and guard for invalid values.

-      const ts = membershipTimestamp
-        ? new Date(membershipTimestamp).getTime()
-        : this.getTs();
+      const parsed = Date.parse(membershipTimestamp || "");
+      const ts = Number.isFinite(parsed) ? parsed : this.getTs();

106-108: Use endpoint max page size (≤250) to reduce requests

The join-order endpoint allows up to 250 per page; cap to that. (postman.com)

-      let params = {
-        limit: DEFAULT_LIMIT,
-      };
+      let params = {
+        limit: Math.min(DEFAULT_LIMIT || 250, 250),
+      };

178-182: Optional: fetch and cache the list’s display name

Consider fetching the list details once (e.g., Lists API) and caching name so event summaries show the actual list name instead of “List ”.


217-219: Remove unused getParams

Method appears unused; safe to drop for clarity.


133-152: Optional: Guard or remove early-exit based on sort order
The /crm/v3/lists/{listId}/memberships/join-order endpoint isn’t reliably ascending in practice, so only break the loop on membershipTimestamp <= lastMembershipTimestamp if you’ve confirmed ascending behavior in your HubSpot account or obtained a formal sort guarantee; otherwise page through all tokens as-is.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3585f78 and fa48fdb.

📒 Files selected for processing (1)
  • components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)
components/hubspot/common/constants.mjs (1)
  • DEFAULT_CONTACT_PROPERTIES (42-69)
components/hubspot/hubspot.app.mjs (7)
  • properties (224-224)
  • properties (243-245)
  • properties (264-266)
  • properties (285-287)
  • properties (306-308)
  • properties (327-329)
  • params (1036-1038)
🔇 Additional comments (1)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1)

58-71: Good: meta uses membership timestamp and includes it in id

Including membershipTimestamp in both meta.ts and meta.id supports legitimate re-add events and improves deduping.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)

47-55: Persist paging token and last processed recordId (not just timestamp).

Relying only on membershipTimestamp risks missing events with identical timestamps and forces full scans. Store the opaque paging.next.after token and the last processed recordId to safely resume and dedupe.

     _getKey(listId) {
       return `list_${listId}_last_timestamp`;
     },
+    _getLastProcessedRecord(listId) {
+      return this.db.get(`list_${listId}_last_record`);
+    },
+    _setLastProcessedRecord(listId, recordId) {
+      if (!recordId) return;
+      this.db.set(`list_${listId}_last_record`, recordId);
+    },
+    _getLastPagingToken(listId) {
+      return this.db.get(`list_${listId}_after`);
+    },
+    _setLastPagingToken(listId, token) {
+      if (!token) return;
+      this.db.set(`list_${listId}_after`, token);
+    },
     _getLastMembershipTimestamp(listId) {
       return this.db.get(this._getKey(listId));
     },
     _setLastMembershipTimestamp(listId, timestamp) {
       this.db.set(this._getKey(listId), timestamp);
     },

120-191: Use the stored paging token + last recordId, and break early to avoid full rescans.

  • Seed params.after from stored token.
  • Skip until lastProcessedRecord when re-reading the tail page.
  • Track lastSeenAfterToken and latestRecordId; persist both at the end.
  • Break once items are older/equal to last timestamp to reduce API calls.
-    async processListMemberships(listId, listInfo) {
-      const lastMembershipTimestamp = this._getLastMembershipTimestamp(listId);
+    async processListMemberships(listId, listInfo) {
+      const lastMembershipTimestamp = this._getLastMembershipTimestamp(listId);
+      const lastProcessedRecord = this._getLastProcessedRecord(listId);
+      const lastAfterToken = this._getLastPagingToken(listId);
       const newMemberships = [];
 
-      let params = {
-        limit: DEFAULT_LIMIT,
-      };
+      let params = { limit: DEFAULT_LIMIT };
+      if (lastAfterToken) params.after = lastAfterToken;
 
       try {
         let hasMore = true;
-        let latestMembershipTimestamp = lastMembershipTimestamp;
+        let latestMembershipTimestamp = lastMembershipTimestamp;
+        let latestRecordId = lastProcessedRecord;
+        let lastSeenAfterToken = lastAfterToken;
+        let started = !lastProcessedRecord; // when re-reading tail page, skip until we hit lastProcessedRecord
 
         if (!lastMembershipTimestamp) {
           const baselineTimestamp = new Date().toISOString();
           this._setLastMembershipTimestamp(listId, baselineTimestamp);
           return newMemberships;
         }
 
         while (hasMore) {
           const {
             results, paging,
           } =
             await this.hubspot.getListMembershipsByJoinOrder({
               listId,
               params,
             });
 
           if (!results) {
             console.warn(
               `No results returned from API for list ${listId} - possible API issue`,
             );
             break;
           }
 
           if (results.length === 0) {
             break;
           }
 
-          for (const membership of results) {
-            const { membershipTimestamp } = membership;
-
-            if (membershipTimestamp > lastMembershipTimestamp) {
-              newMemberships.push({
-                membership,
-                listInfo,
-              });
-
-              if (
-                !latestMembershipTimestamp ||
-                membershipTimestamp > latestMembershipTimestamp
-              ) {
-                latestMembershipTimestamp = membershipTimestamp;
-              }
-            }
-          }
+          for (const membership of results) {
+            const { recordId, membershipTimestamp } = membership;
+            if (!started) {
+              if (recordId === lastProcessedRecord) {
+                started = true;
+                continue; // resume after the last processed one
+              }
+              continue; // still skipping
+            }
+            if (membershipTimestamp <= lastMembershipTimestamp) {
+              hasMore = false; // reached older/equal events, stop scanning
+              break;
+            }
+            newMemberships.push({ membership, listInfo });
+            latestRecordId = recordId;
+            if (!latestMembershipTimestamp || membershipTimestamp > latestMembershipTimestamp) {
+              latestMembershipTimestamp = membershipTimestamp;
+            }
+          }
 
           if (paging?.next?.after) {
             params.after = paging.next.after;
+            lastSeenAfterToken = paging.next.after;
           } else {
             hasMore = false;
           }
         }
 
-        if (latestMembershipTimestamp !== lastMembershipTimestamp) {
-          this._setLastMembershipTimestamp(listId, latestMembershipTimestamp);
-        }
+        if (latestMembershipTimestamp && latestMembershipTimestamp !== lastMembershipTimestamp) {
+          this._setLastMembershipTimestamp(listId, latestMembershipTimestamp);
+        }
+        if (latestRecordId) this._setLastProcessedRecord(listId, latestRecordId);
+        if (lastSeenAfterToken) this._setLastPagingToken(listId, lastSeenAfterToken);
       } catch (error) {
         console.error(`Error processing list ${listId}:`, error);
       }
 
       return newMemberships;
     },
🧹 Nitpick comments (4)
components/hubspot/hubspot.app.mjs (2)

1267-1292: Good addition; consider Retry-After and error shape fallback, and avoid duplicating retry logic.

  • Use error.response.status fallback to ensure 429 detection across axios variants.
  • Respect Retry-After header when present.
  • Optional: extract a small with429Retry helper to DRY this pattern (also used in searchCRM and below).

Apply within this block:

-        } catch (error) {
-          if (error.status === 429 && ++retries < MAX_RETRIES) {
-            const randomDelay = Math.floor(Math.random() * BASE_RETRY_DELAY);
-            const delay = BASE_RETRY_DELAY * (2 ** retries) + randomDelay;
-            await new Promise((resolve) => setTimeout(resolve, delay));
-          } else {
-            throw error;
-          }
-        }
+        } catch (error) {
+          const status = error?.response?.status ?? error?.status;
+          if (status === 429 && ++retries < MAX_RETRIES) {
+            const retryAfter = Number(error?.response?.headers?.["retry-after"]);
+            const baseDelay = Number.isFinite(retryAfter)
+              ? retryAfter * 1000
+              : BASE_RETRY_DELAY * (2 ** retries);
+            const randomDelay = Math.floor(Math.random() * BASE_RETRY_DELAY);
+            const delay = baseDelay + randomDelay;
+            await new Promise((resolve) => setTimeout(resolve, delay));
+          } else {
+            throw error;
+          }
+        }

1293-1319: Batch read retry looks solid; apply the same Retry-After + error shape and consider centralizing.

Mirror the same improvements here and factor out a shared retry helper to reduce copy/paste.

-        } catch (error) {
-          if (error.status === 429 && ++retries < MAX_RETRIES) {
-            const randomDelay = Math.floor(Math.random() * BASE_RETRY_DELAY);
-            const delay = BASE_RETRY_DELAY * (2 ** retries) + randomDelay;
-            await new Promise((resolve) => setTimeout(resolve, delay));
-          } else {
-            throw error;
-          }
-        }
+        } catch (error) {
+          const status = error?.response?.status ?? error?.status;
+          if (status === 429 && ++retries < MAX_RETRIES) {
+            const retryAfter = Number(error?.response?.headers?.["retry-after"]);
+            const baseDelay = Number.isFinite(retryAfter)
+              ? retryAfter * 1000
+              : BASE_RETRY_DELAY * (2 ** retries);
+            const randomDelay = Math.floor(Math.random() * BASE_RETRY_DELAY);
+            const delay = baseDelay + randomDelay;
+            await new Promise((resolve) => setTimeout(resolve, delay));
+          } else {
+            throw error;
+          }
+        }
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)

76-86: De-duplicate IDs and properties before batch read; keep chunks at 100.

Saves API calls and avoids redundant fields.

-      const { properties = [] } = this;
-      const allProperties = [
-        ...DEFAULT_CONTACT_PROPERTIES,
-        ...properties,
-      ];
+      const { properties = [] } = this;
+      const allProperties = Array.from(new Set([
+        ...DEFAULT_CONTACT_PROPERTIES,
+        ...properties,
+      ]));
 
-      const chunks = [];
-      const chunkSize = 100;
-      for (let i = 0; i < contactIds.length; i += chunkSize) {
-        chunks.push(contactIds.slice(i, i + chunkSize));
-      }
+      const ids = Array.from(new Set(contactIds));
+      const chunks = [];
+      const chunkSize = 100;
+      for (let i = 0; i < ids.length; i += chunkSize) {
+        chunks.push(ids.slice(i, i + chunkSize));
+      }
@@
-          const contactIds = newMemberships.map(
+          const contactIds = newMemberships.map(
             ({ membership }) => membership.recordId,
           );

Also applies to: 214-219


158-176: Optional early-exit optimization.

If the endpoint returns results ordered by join time, once you encounter membershipTimestamp <= lastMembershipTimestamp you can stop scanning further pages.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fa48fdb and d0923d8.

📒 Files selected for processing (2)
  • components/hubspot/hubspot.app.mjs (1 hunks)
  • components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/hubspot/hubspot.app.mjs (1)
components/hubspot/common/constants.mjs (1)
  • API_PATH (9-29)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (2)
components/hubspot/common/constants.mjs (1)
  • DEFAULT_CONTACT_PROPERTIES (42-69)
components/hubspot/hubspot.app.mjs (9)
  • properties (224-224)
  • properties (243-245)
  • properties (264-266)
  • properties (285-287)
  • properties (306-308)
  • properties (327-329)
  • params (1036-1038)
  • DEFAULT_LIMIT (35-41)
  • DEFAULT_LIMIT (587-592)
🔇 Additional comments (4)
components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs (4)

59-72: Meta generation looks good.

Using membershipTimestamp for ts and including it in meta.id enables distinct re-add events.


225-236: Event payload uses actual join time.

Emitting addedAt from membership.membershipTimestamp matches the source-of-truth event time.


128-136: First-run behavior aligns with requirements.

Initializing baseline and skipping existing members on first run is correct.


208-241: Log message includes listId context—nice.

Flow is straightforward and guarded; once the above fixes are applied, this should be reliable.

@michelle0927 michelle0927 self-requested a review September 5, 2025 15:31
Copy link
Collaborator

@michelle0927 michelle0927 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @choeqq. I was unable to push changes to your fork/branch, so I left comments with the changes needed. I added a new searchLists method due to an error I was getting with legacy list IDs.
Also, since the app file (hubspot.app.mjs) is updated, you'll need to update the versions of all Hubspot components.
Let me know if you have any questions or issues while testing the changes.

Copy link
Collaborator

@michelle0927 michelle0927 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@choeqq Thanks for making those changes! Looks like there was another Hubspot PR merged recently. Sorry to have to ask you to do this, but you'll need to bump the versions of the Hubspot sources once more before this is published.
Ready for QA!

Copy link
Contributor Author

@choeqq choeqq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @michelle0927 I just bumped all components versions as requested ✅
I hope now it's ready for QA!
Thank you 👐

@vunguyenhung vunguyenhung merged commit 1385475 into PipedreamHQ:master Sep 9, 2025
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

User submitted Submitted by a user

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[TRIGGER] Hubspot Contact added to list

5 participants