From 9ffc455233ce8dbe5667ac295dd45636be184fbc Mon Sep 17 00:00:00 2001 From: m0wer Date: Thu, 12 Jun 2025 22:41:53 +0200 Subject: [PATCH 01/10] feat: Add support for scheduled posts --- api/paidAction/itemCreate.js | 7 + api/paidAction/lib/item.js | 36 ++- api/resolvers/item.js | 150 +++++++++- api/resolvers/search.js | 8 +- api/typeDefs/item.js | 5 + fragments/paidAction.js | 14 + lib/item.js | 24 ++ .../migration.sql | 274 ++++++++++++++++++ prisma/schema.prisma | 5 + worker/index.js | 3 + worker/publishScheduledPosts.js | 77 +++++ 11 files changed, 595 insertions(+), 8 deletions(-) create mode 100644 prisma/migrations/20250611000000_scheduled_posts/migration.sql create mode 100644 worker/publishScheduledPosts.js diff --git a/api/paidAction/itemCreate.js b/api/paidAction/itemCreate.js index d6fb603f7..fb76fce8c 100644 --- a/api/paidAction/itemCreate.js +++ b/api/paidAction/itemCreate.js @@ -3,6 +3,7 @@ import { notifyItemMention, notifyItemParents, notifyMention, notifyTerritorySub import { getItemMentions, getMentions, performBotBehavior } from './lib/item' import { msatsToSats, satsToMsats } from '@/lib/format' import { GqlInputError } from '@/lib/error' +import { getScheduleAt } from '@/lib/item' export const anonable = true @@ -96,6 +97,10 @@ export async function perform (args, context) { const mentions = await getMentions(args, context) const itemMentions = await getItemMentions(args, context) + // Check if this is a scheduled post + const scheduleAt = getScheduleAt(data.text) + const isScheduled = !!scheduleAt + // start with median vote if (me) { const [row] = await tx.$queryRaw`SELECT @@ -112,6 +117,8 @@ export async function perform (args, context) { ...data, ...invoiceData, boost, + isScheduled, + scheduledAt: scheduleAt, threadSubscriptions: { createMany: { data: [ diff --git a/api/paidAction/lib/item.js b/api/paidAction/lib/item.js index 879b1cb53..908b15458 100644 --- a/api/paidAction/lib/item.js +++ b/api/paidAction/lib/item.js @@ -1,5 +1,5 @@ import { USER_ID } from '@/lib/constants' -import { deleteReminders, getDeleteAt, getRemindAt } from '@/lib/item' +import { deleteReminders, getDeleteAt, getRemindAt, getScheduleAt } from '@/lib/item' import { parseInternalLinks } from '@/lib/url' export async function getMentions ({ text }, { me, tx }) { @@ -46,14 +46,19 @@ export const getItemMentions = async ({ text }, { me, tx }) => { } export async function performBotBehavior ({ text, id }, { me, tx }) { - // delete any existing deleteItem or reminder jobs for this item + // delete any existing deleteItem, reminder, or publishScheduledPost jobs for this item const userId = me?.id || USER_ID.anon id = Number(id) await tx.$queryRaw` DELETE FROM pgboss.job - WHERE name = 'deleteItem' + WHERE (name = 'deleteItem' OR name = 'publishScheduledPost') AND data->>'id' = ${id}::TEXT AND state <> 'completed'` + await tx.$queryRaw` + DELETE FROM pgboss.job + WHERE name = 'publishScheduledPost' + AND data->>'itemId' = ${id}::TEXT + AND state <> 'completed'` await deleteReminders({ id, userId, models: tx }) if (text) { @@ -85,5 +90,30 @@ export async function performBotBehavior ({ text, id }, { me, tx }) { } }) } + + const scheduleAt = getScheduleAt(text) + if (scheduleAt) { + // For new items, scheduling info is set during creation + // For updates, we need to update the item + const existingItem = await tx.item.findUnique({ where: { id: Number(id) } }) + if (existingItem && !existingItem.isScheduled) { + await tx.item.update({ + where: { id: Number(id) }, + data: { + isScheduled: true, + scheduledAt: scheduleAt + } + }) + } + + // Schedule the job to publish the post + await tx.$queryRaw` + INSERT INTO pgboss.job (name, data, startafter, keepuntil) + VALUES ( + 'publishScheduledPost', + jsonb_build_object('itemId', ${id}::INTEGER), + ${scheduleAt}::TIMESTAMP WITH TIME ZONE, + ${scheduleAt}::TIMESTAMP WITH TIME ZONE + interval '1 minute')` + } } } diff --git a/api/resolvers/item.js b/api/resolvers/item.js index de46f2f26..eab17c5eb 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -108,7 +108,8 @@ export async function getItem (parent, { id }, { me, models }) { FROM "Item" ${whereClause( '"Item".id = $1', - activeOrMine(me) + activeOrMine(me), + scheduledOrMine(me) )}` }, Number(id)) return item @@ -130,6 +131,7 @@ export async function getAd (parent, { sub, subArr = [], showNsfw = false }, { m '"Item".bio = false', '"Item".boost > 0', activeOrMine(), + scheduledOrMine(me), subClause(sub, 1, 'Item', me, showNsfw), muteClause(me))} ORDER BY boost desc, "Item".created_at ASC @@ -256,6 +258,12 @@ export const activeOrMine = (me) => { export const muteClause = me => me ? `NOT EXISTS (SELECT 1 FROM "Mute" WHERE "Mute"."muterId" = ${me.id} AND "Mute"."mutedId" = "Item"."userId")` : '' +export const scheduledOrMine = (me) => { + return me + ? `("Item"."isScheduled" = false OR "Item"."userId" = ${me.id})` + : '"Item"."isScheduled" = false' +} + const HIDE_NSFW_CLAUSE = '("Sub"."nsfw" = FALSE OR "Sub"."nsfw" IS NULL)' export const nsfwClause = showNsfw => showNsfw ? '' : HIDE_NSFW_CLAUSE @@ -411,6 +419,7 @@ export default { ${whereClause( `"${table}"."userId" = $3`, activeOrMine(me), + scheduledOrMine(me), nsfwClause(showNsfw), typeClause(type), by === 'boost' && '"Item".boost > 0', @@ -433,6 +442,7 @@ export default { '"Item"."deletedAt" IS NULL', subClause(sub, 4, subClauseTable(type), me, showNsfw), activeOrMine(me), + scheduledOrMine(me), await filterClause(me, models, type), typeClause(type), muteClause(me) @@ -457,6 +467,7 @@ export default { typeClause(type), whenClause(when, 'Item'), activeOrMine(me), + scheduledOrMine(me), await filterClause(me, models, type), by === 'boost' && '"Item".boost > 0', muteClause(me))} @@ -484,6 +495,7 @@ export default { typeClause(type), await filterClause(me, models, type), activeOrMine(me), + scheduledOrMine(me), muteClause(me))} ${orderByClause('random', me, models, type)} OFFSET $1 @@ -513,6 +525,7 @@ export default { '"parentId" IS NULL', '"Item"."deletedAt" IS NULL', activeOrMine(me), + scheduledOrMine(me), 'created_at <= $1', '"pinId" IS NULL', subClause(sub, 4) @@ -543,6 +556,7 @@ export default { '"pinId" IS NOT NULL', '"parentId" IS NULL', sub ? '"subName" = $1' : '"subName" IS NULL', + scheduledOrMine(me), muteClause(me))} ) rank_filter WHERE RANK = 1 ORDER BY position ASC`, @@ -569,6 +583,7 @@ export default { '"Item".bio = false', ad ? `"Item".id <> ${ad.id}` : '', activeOrMine(me), + scheduledOrMine(me), await filterClause(me, models, type), subClause(sub, 3, 'Item', me, showNsfw), muteClause(me))} @@ -653,6 +668,7 @@ export default { ${SELECT} FROM "Item" WHERE url ~* $1 AND ("Item"."invoiceActionState" IS NULL OR "Item"."invoiceActionState" = 'PAID') + AND (${scheduledOrMine(me)}) ORDER BY created_at DESC LIMIT 3` }, similar) @@ -733,6 +749,36 @@ export default { homeMaxBoost: homeAgg._max.boost || 0, subMaxBoost: subAgg?._max.boost || 0 } + }, + scheduledItems: async (parent, { cursor, limit = LIMIT }, { me, models }) => { + if (!me) { + throw new GqlAuthenticationError() + } + + const decodedCursor = decodeCursor(cursor) + + const items = await itemQueryWithMeta({ + me, + models, + query: ` + ${SELECT}, trim(both ' ' from + coalesce(ltree2text(subpath("path", 0, -1)), '')) AS "ancestorTitles" + FROM "Item" + WHERE "userId" = $1 AND "isScheduled" = true AND "deletedAt" IS NULL + AND ("invoiceActionState" IS NULL OR "invoiceActionState" = 'PAID') + AND created_at <= $2::timestamp + ORDER BY "scheduledAt" ASC + OFFSET $3 + LIMIT $4`, + orderBy: '' + }, me.id, decodedCursor.time, decodedCursor.offset, limit) + + return { + cursor: items.length === limit ? nextCursorEncoded(decodedCursor, limit) : null, + items, + pins: [], + ad: null + } } }, @@ -1048,6 +1094,94 @@ export default { ]) return result + }, + cancelScheduledPost: async (parent, { id }, { me, models }) => { + if (!me) { + throw new GqlAuthenticationError() + } + + const item = await models.item.findUnique({ + where: { id: Number(id) } + }) + + if (!item) { + throw new GqlInputError('item not found') + } + + if (Number(item.userId) !== Number(me.id)) { + throw new GqlInputError('item does not belong to you') + } + + if (!item.isScheduled) { + throw new GqlInputError('item is not scheduled') + } + + // Cancel the scheduled job + await models.$queryRaw` + DELETE FROM pgboss.job + WHERE name = 'publishScheduledPost' + AND data->>'itemId' = ${item.id}::TEXT + AND state <> 'completed'` + + // Update the item to remove scheduling + return await models.item.update({ + where: { id: Number(id) }, + data: { + isScheduled: false, + scheduledAt: null + } + }) + }, + publishScheduledPostNow: async (parent, { id }, { me, models }) => { + if (!me) { + throw new GqlAuthenticationError() + } + + const item = await models.item.findUnique({ + where: { id: Number(id) } + }) + + if (!item) { + throw new GqlInputError('item not found') + } + + if (Number(item.userId) !== Number(me.id)) { + throw new GqlInputError('item does not belong to you') + } + + if (!item.isScheduled) { + throw new GqlInputError('item is not scheduled') + } + + // Cancel the scheduled job + await models.$queryRaw` + DELETE FROM pgboss.job + WHERE name = 'publishScheduledPost' + AND data->>'itemId' = ${item.id}::TEXT + AND state <> 'completed'` + + const publishTime = new Date() + + // Publish immediately with current timestamp + const updatedItem = await models.item.update({ + where: { id: Number(id) }, + data: { + isScheduled: false, + scheduledAt: null, + createdAt: publishTime, + updatedAt: publishTime + } + }) + + // Refresh cached views + await models.$executeRaw`REFRESH MATERIALIZED VIEW CONCURRENTLY hot_score_view` + + // Queue side effects + await models.$executeRaw` + INSERT INTO pgboss.job (name, data, startafter) + VALUES ('schedulePostSideEffects', jsonb_build_object('itemId', ${item.id}::INTEGER), now())` + + return updatedItem } }, ItemAct: { @@ -1357,7 +1491,8 @@ export default { FROM "Item" ${whereClause( '"Item".id = $1', - `("Item"."invoiceActionState" IS NULL OR "Item"."invoiceActionState" = 'PAID'${me ? ` OR "Item"."userId" = ${me.id}` : ''})` + `("Item"."invoiceActionState" IS NULL OR "Item"."invoiceActionState" = 'PAID'${me ? ` OR "Item"."userId" = ${me.id}` : ''})`, + scheduledOrMine(me) )}` }, Number(item.rootId)) @@ -1416,6 +1551,17 @@ export default { AND data->>'userId' = ${meId}::TEXT AND state = 'created'` return reminderJobs[0]?.startafter ?? null + }, + scheduledAt: async (item, args, { me, models }) => { + const meId = me?.id ?? USER_ID.anon + if (meId !== item.userId) { + // Only show scheduledAt for your own items to keep DB queries minimized + return null + } + return item.scheduledAt + }, + isScheduled: async (item, args, { me, models }) => { + return !!item.isScheduled } } } diff --git a/api/resolvers/search.js b/api/resolvers/search.js index 15741f4f2..0cd2ef6d1 100644 --- a/api/resolvers/search.js +++ b/api/resolvers/search.js @@ -1,6 +1,6 @@ import { decodeCursor, LIMIT, nextCursorEncoded } from '@/lib/cursor' import { whenToFrom } from '@/lib/time' -import { getItem, itemQueryWithMeta, SELECT } from './item' +import { getItem, itemQueryWithMeta, SELECT, scheduledOrMine } from './item' import { parse } from 'tldts' function queryParts (q) { @@ -163,7 +163,8 @@ export default { WITH r(id, rank) AS (VALUES ${values}) ${SELECT}, rank FROM "Item" - JOIN r ON "Item".id = r.id`, + JOIN r ON "Item".id = r.id + WHERE ${scheduledOrMine(me)}`, orderBy: 'ORDER BY rank ASC' }) @@ -501,7 +502,8 @@ export default { WITH r(id, rank) AS (VALUES ${values}) ${SELECT}, rank FROM "Item" - JOIN r ON "Item".id = r.id`, + JOIN r ON "Item".id = r.id + WHERE ${scheduledOrMine(me)}`, orderBy: 'ORDER BY rank ASC, msats DESC' })).map((item, i) => { const e = sitems.body.hits.hits[i] diff --git a/api/typeDefs/item.js b/api/typeDefs/item.js index a40a99ae1..e58a800c6 100644 --- a/api/typeDefs/item.js +++ b/api/typeDefs/item.js @@ -11,6 +11,7 @@ export default gql` auctionPosition(sub: String, id: ID, boost: Int): Int! boostPosition(sub: String, id: ID, boost: Int): BoostPositions! itemRepetition(parentId: ID): Int! + scheduledItems(cursor: String, limit: Limit): Items } type BoostPositions { @@ -63,6 +64,8 @@ export default gql` act(id: ID!, sats: Int, act: String, hasSendWallet: Boolean): ItemActPaidAction! pollVote(id: ID!): PollVotePaidAction! toggleOutlaw(id: ID!): Item! + cancelScheduledPost(id: ID!): Item! + publishScheduledPostNow(id: ID!): Item! } type PollVoteResult { @@ -112,6 +115,8 @@ export default gql` deletedAt: Date deleteScheduledAt: Date reminderScheduledAt: Date + scheduledAt: Date + isScheduled: Boolean! title: String searchTitle: String url: String diff --git a/fragments/paidAction.js b/fragments/paidAction.js index 60ed16e25..4b5f3ae23 100644 --- a/fragments/paidAction.js +++ b/fragments/paidAction.js @@ -23,6 +23,8 @@ const ITEM_PAID_ACTION_FIELDS = gql` id deleteScheduledAt reminderScheduledAt + scheduledAt + isScheduled ...CommentFields comments { comments { @@ -39,6 +41,8 @@ const ITEM_PAID_ACTION_FIELDS_NO_CHILD_COMMENTS = gql` id deleteScheduledAt reminderScheduledAt + scheduledAt + isScheduled ...CommentFields } } @@ -151,6 +155,8 @@ export const UPSERT_DISCUSSION = gql` id deleteScheduledAt reminderScheduledAt + scheduledAt + isScheduled } ...PaidActionFields } @@ -168,6 +174,8 @@ export const UPSERT_JOB = gql` id deleteScheduledAt reminderScheduledAt + scheduledAt + isScheduled } ...PaidActionFields } @@ -183,6 +191,8 @@ export const UPSERT_LINK = gql` id deleteScheduledAt reminderScheduledAt + scheduledAt + isScheduled } ...PaidActionFields } @@ -200,6 +210,8 @@ export const UPSERT_POLL = gql` id deleteScheduledAt reminderScheduledAt + scheduledAt + isScheduled } ...PaidActionFields } @@ -215,6 +227,8 @@ export const UPSERT_BOUNTY = gql` id deleteScheduledAt reminderScheduledAt + scheduledAt + isScheduled } ...PaidActionFields } diff --git a/lib/item.js b/lib/item.js index 7bde10af1..d8a4346df 100644 --- a/lib/item.js +++ b/lib/item.js @@ -21,8 +21,14 @@ const reminderPattern = /\B@remindme\s+in\s+(\d+)\s+(second|minute|hour|day|week const reminderMentionPattern = /\B@remindme/i +const schedulePattern = /\B@schedule\s+in\s+(\d+)\s+(second|minute|hour|day|week|month|year)s?/gi + +const scheduleMentionPattern = /\B@schedule/i + export const hasDeleteMention = (text) => deleteMentionPattern.test(text ?? '') +export const hasScheduleMention = (text) => scheduleMentionPattern.test(text ?? '') + export const getDeleteCommand = (text) => { if (!text) return false const matches = [...text.matchAll(deletePattern)] @@ -61,6 +67,24 @@ export const getReminderCommand = (text) => { export const hasReminderCommand = (text) => !!getReminderCommand(text) +export const getScheduleCommand = (text) => { + if (!text) return false + const matches = [...text.matchAll(schedulePattern)] + const commands = matches?.map(match => ({ number: parseInt(match[1]), unit: match[2] })) + return commands.length ? commands[commands.length - 1] : undefined +} + +export const getScheduleAt = (text) => { + const command = getScheduleCommand(text) + if (command) { + const { number, unit } = command + return datePivot(new Date(), { [`${unit}s`]: number }) + } + return null +} + +export const hasScheduleCommand = (text) => !!getScheduleCommand(text) + export const deleteItemByAuthor = async ({ models, id, item }) => { if (!item) { item = await models.item.findUnique({ where: { id: Number(id) } }) diff --git a/prisma/migrations/20250611000000_scheduled_posts/migration.sql b/prisma/migrations/20250611000000_scheduled_posts/migration.sql new file mode 100644 index 000000000..de2122237 --- /dev/null +++ b/prisma/migrations/20250611000000_scheduled_posts/migration.sql @@ -0,0 +1,274 @@ +-- Add fields for scheduled posts to the Item model +ALTER TABLE "Item" ADD COLUMN "scheduledAt" TIMESTAMP(3); +ALTER TABLE "Item" ADD COLUMN "isScheduled" BOOLEAN NOT NULL DEFAULT FALSE; + +-- Create indexes for efficient querying of scheduled posts +CREATE INDEX "Item_scheduledAt_idx" ON "Item"("scheduledAt"); +CREATE INDEX "Item_isScheduled_idx" ON "Item"("isScheduled"); +CREATE INDEX "Item_isScheduled_scheduledAt_idx" ON "Item"("isScheduled", "scheduledAt"); + +-- Update stored functions to filter out scheduled posts from comments for non-owners + +CREATE OR REPLACE FUNCTION item_comments_zaprank_with_me_limited( + _item_id int, _global_seed int, _me_id int, _limit int, _offset int, _grandchild_limit int, + _level int, _where text, _order_by text) + RETURNS jsonb + LANGUAGE plpgsql VOLATILE PARALLEL SAFE AS +$$ +DECLARE + result jsonb; +BEGIN + IF _level < 1 THEN + RETURN '[]'::jsonb; + END IF; + + EXECUTE 'CREATE TEMP TABLE IF NOT EXISTS t_item ON COMMIT DROP AS ' + || 'WITH RECURSIVE base AS ( ' + || ' (SELECT "Item".*, 1 as level, ROW_NUMBER() OVER () as rn ' + || ' FROM "Item" ' + || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' + || ' WHERE "Item"."parentId" = $1 ' + || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3) ' + || _order_by || ' ' + || ' LIMIT $4 ' + || ' OFFSET $5) ' + || ' UNION ALL ' + || ' (SELECT "Item".*, b.level + 1, ROW_NUMBER() OVER (PARTITION BY "Item"."parentId" ' || _order_by || ') as rn ' + || ' FROM "Item" ' + || ' JOIN base b ON "Item"."parentId" = b.id ' + || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' + || ' WHERE b.level < $7 AND (b.level = 1 OR b.rn <= $6) ' + || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3)) ' + || ') ' + || 'SELECT "Item".*, ' + || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' + || ' "Item".updated_at at time zone ''UTC'' AS "updatedAt", ' + || ' "Item"."invoicePaidAt" at time zone ''UTC'' AS "invoicePaidAtUTC", ' + || ' to_jsonb(users.*) || jsonb_build_object(''meMute'', "Mute"."mutedId" IS NOT NULL) AS user, ' + || ' COALESCE("ItemAct"."meMsats", 0) AS "meMsats", ' + || ' COALESCE("ItemAct"."mePendingMsats", 0) as "mePendingMsats", ' + || ' COALESCE("ItemAct"."meDontLikeMsats", 0) AS "meDontLikeMsats", ' + || ' COALESCE("ItemAct"."meMcredits", 0) AS "meMcredits", ' + || ' COALESCE("ItemAct"."mePendingMcredits", 0) as "mePendingMcredits", ' + || ' "Bookmark"."itemId" IS NOT NULL AS "meBookmark", ' + || ' "ThreadSubscription"."itemId" IS NOT NULL AS "meSubscription", ' + || ' g.hot_score AS "hotScore", g.sub_hot_score AS "subHotScore" ' + || 'FROM base "Item" ' + || 'JOIN users ON users.id = "Item"."userId" ' + || ' LEFT JOIN "Mute" ON "Mute"."muterId" = $3 AND "Mute"."mutedId" = "Item"."userId" ' + || ' LEFT JOIN "Bookmark" ON "Bookmark"."userId" = $3 AND "Bookmark"."itemId" = "Item".id ' + || ' LEFT JOIN "ThreadSubscription" ON "ThreadSubscription"."userId" = $3 AND "ThreadSubscription"."itemId" = "Item".id ' + || ' LEFT JOIN hot_score_view g ON g.id = "Item".id ' + || 'LEFT JOIN LATERAL ( ' + || ' SELECT "itemId", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS DISTINCT FROM ''FAILED'' AND "InvoiceForward".id IS NOT NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "meMsats", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS DISTINCT FROM ''FAILED'' AND "InvoiceForward".id IS NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "meMcredits", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS NOT DISTINCT FROM ''PENDING'' AND "InvoiceForward".id IS NOT NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "mePendingMsats", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS NOT DISTINCT FROM ''PENDING'' AND "InvoiceForward".id IS NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "mePendingMcredits", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS DISTINCT FROM ''FAILED'' AND act = ''DONT_LIKE_THIS'') AS "meDontLikeMsats" ' + || ' FROM "ItemAct" ' + || ' LEFT JOIN "Invoice" ON "Invoice".id = "ItemAct"."invoiceId" ' + || ' LEFT JOIN "InvoiceForward" ON "InvoiceForward"."invoiceId" = "Invoice"."id" ' + || ' WHERE "ItemAct"."userId" = $3 ' + || ' AND "ItemAct"."itemId" = "Item".id ' + || ' GROUP BY "ItemAct"."itemId" ' + || ') "ItemAct" ON true ' + || 'WHERE ("Item".level = 1 OR "Item".rn <= $6 - "Item".level + 2) ' || _where || ' ' + USING _item_id, _global_seed, _me_id, _limit, _offset, _grandchild_limit, _level, _where, _order_by; + + EXECUTE '' + || 'SELECT COALESCE(jsonb_agg(sub), ''[]''::jsonb) AS comments ' + || 'FROM ( ' + || ' SELECT "Item".*, item_comments_zaprank_with_me_limited("Item".id, $2, $3, $4, $5, $6, $7 - 1, $8, $9) AS comments ' + || ' FROM t_item "Item" ' + || ' WHERE "Item"."parentId" = $1 ' + || _order_by + || ' ) sub ' + INTO result + USING _item_id, _global_seed, _me_id, _limit, _offset, _grandchild_limit, _level, _where, _order_by; + + RETURN result; +END +$$; + +CREATE OR REPLACE FUNCTION item_comments_zaprank_with_me(_item_id int, _global_seed int, _me_id int, _level int, _where text, _order_by text) + RETURNS jsonb + LANGUAGE plpgsql VOLATILE PARALLEL SAFE AS +$$ +DECLARE + result jsonb; +BEGIN + IF _level < 1 THEN + RETURN '[]'::jsonb; + END IF; + + EXECUTE 'CREATE TEMP TABLE IF NOT EXISTS t_item ON COMMIT DROP AS ' + || 'WITH RECURSIVE base AS ( ' + || ' (SELECT "Item".*, 1 as level ' + || ' FROM "Item" ' + || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' + || ' WHERE "Item"."parentId" = $1 ' + || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3) ' + || _order_by || ') ' + || ' UNION ALL ' + || ' (SELECT "Item".*, b.level + 1 ' + || ' FROM "Item" ' + || ' JOIN base b ON "Item"."parentId" = b.id ' + || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' + || ' WHERE b.level < $2 ' + || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3)) ' + || ') ' + || 'SELECT "Item".*, ' + || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' + || ' "Item".updated_at at time zone ''UTC'' AS "updatedAt", ' + || ' "Item"."invoicePaidAt" at time zone ''UTC'' AS "invoicePaidAtUTC", ' + || ' to_jsonb(users.*) || jsonb_build_object(''meMute'', "Mute"."mutedId" IS NOT NULL) AS user, ' + || ' COALESCE("ItemAct"."meMsats", 0) AS "meMsats", ' + || ' COALESCE("ItemAct"."mePendingMsats", 0) as "mePendingMsats", ' + || ' COALESCE("ItemAct"."meDontLikeMsats", 0) AS "meDontLikeMsats", ' + || ' COALESCE("ItemAct"."meMcredits", 0) AS "meMcredits", ' + || ' COALESCE("ItemAct"."mePendingMcredits", 0) as "mePendingMcredits", ' + || ' "Bookmark"."itemId" IS NOT NULL AS "meBookmark", ' + || ' "ThreadSubscription"."itemId" IS NOT NULL AS "meSubscription", ' + || ' g.hot_score AS "hotScore", g.sub_hot_score AS "subHotScore" ' + || 'FROM base "Item" ' + || 'JOIN users ON users.id = "Item"."userId" ' + || ' LEFT JOIN "Mute" ON "Mute"."muterId" = $3 AND "Mute"."mutedId" = "Item"."userId" ' + || ' LEFT JOIN "Bookmark" ON "Bookmark"."userId" = $3 AND "Bookmark"."itemId" = "Item".id ' + || ' LEFT JOIN "ThreadSubscription" ON "ThreadSubscription"."userId" = $3 AND "ThreadSubscription"."itemId" = "Item".id ' + || ' LEFT JOIN hot_score_view g ON g.id = "Item".id ' + || 'LEFT JOIN LATERAL ( ' + || ' SELECT "itemId", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS DISTINCT FROM ''FAILED'' AND "InvoiceForward".id IS NOT NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "meMsats", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS DISTINCT FROM ''FAILED'' AND "InvoiceForward".id IS NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "meMcredits", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS NOT DISTINCT FROM ''PENDING'' AND "InvoiceForward".id IS NOT NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "mePendingMsats", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS NOT DISTINCT FROM ''PENDING'' AND "InvoiceForward".id IS NULL AND (act = ''FEE'' OR act = ''TIP'')) AS "mePendingMcredits", ' + || ' sum("ItemAct".msats) FILTER (WHERE "invoiceActionState" IS DISTINCT FROM ''FAILED'' AND act = ''DONT_LIKE_THIS'') AS "meDontLikeMsats" ' + || ' FROM "ItemAct" ' + || ' LEFT JOIN "Invoice" ON "Invoice".id = "ItemAct"."invoiceId" ' + || ' LEFT JOIN "InvoiceForward" ON "InvoiceForward"."invoiceId" = "Invoice"."id" ' + || ' WHERE "ItemAct"."userId" = $3 ' + || ' AND "ItemAct"."itemId" = "Item".id ' + || ' GROUP BY "ItemAct"."itemId" ' + || ') "ItemAct" ON true ' + || _where || ' ' + USING _item_id, _level, _me_id, _global_seed, _where, _order_by; + + EXECUTE '' + || 'SELECT COALESCE(jsonb_agg(sub), ''[]''::jsonb) AS comments ' + || 'FROM ( ' + || ' SELECT "Item".*, item_comments_zaprank_with_me("Item".id, $6, $5, $2 - 1, $3, $4) AS comments ' + || ' FROM t_item "Item" ' + || ' WHERE "Item"."parentId" = $1 ' + || _order_by + || ' ) sub ' + INTO result + USING _item_id, _level, _where, _order_by, _me_id, _global_seed; + + RETURN result; +END +$$; + +CREATE OR REPLACE FUNCTION item_comments(_item_id int, _level int, _where text, _order_by text) + RETURNS jsonb + LANGUAGE plpgsql VOLATILE PARALLEL SAFE AS +$$ +DECLARE + result jsonb; +BEGIN + IF _level < 1 THEN + RETURN '[]'::jsonb; + END IF; + + EXECUTE 'CREATE TEMP TABLE IF NOT EXISTS t_item ON COMMIT DROP AS ' + || 'WITH RECURSIVE base AS ( ' + || ' (SELECT "Item".*, 1 as level ' + || ' FROM "Item" ' + || ' WHERE "Item"."parentId" = $1 ' + || ' AND "Item"."isScheduled" = false ' + || _order_by || ') ' + || ' UNION ALL ' + || ' (SELECT "Item".*, b.level + 1 ' + || ' FROM "Item" ' + || ' JOIN base b ON "Item"."parentId" = b.id ' + || ' WHERE b.level < $2 ' + || ' AND "Item"."isScheduled" = false) ' + || ') ' + || 'SELECT "Item".*, ' + || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' + || ' "Item".updated_at at time zone ''UTC'' AS "updatedAt", ' + || ' "Item"."invoicePaidAt" at time zone ''UTC'' AS "invoicePaidAtUTC", ' + || ' to_jsonb(users.*) AS user ' + || 'FROM base "Item" ' + || 'JOIN users ON users.id = "Item"."userId" ' + || _where || ' ' + USING _item_id, _level, _where, _order_by; + + EXECUTE '' + || 'SELECT COALESCE(jsonb_agg(sub), ''[]''::jsonb) AS comments ' + || 'FROM ( ' + || ' SELECT "Item".*, item_comments("Item".id, $2 - 1, $3, $4) AS comments ' + || ' FROM t_item "Item" ' + || ' WHERE "Item"."parentId" = $1 ' + || _order_by + || ' ) sub ' + INTO result + USING _item_id, _level, _where, _order_by; + + RETURN result; +END +$$; + +CREATE OR REPLACE FUNCTION item_comments_limited( + _item_id int, _limit int, _offset int, _grandchild_limit int, + _level int, _where text, _order_by text) + RETURNS jsonb + LANGUAGE plpgsql VOLATILE PARALLEL SAFE AS +$$ +DECLARE + result jsonb; +BEGIN + IF _level < 1 THEN + RETURN '[]'::jsonb; + END IF; + + EXECUTE 'CREATE TEMP TABLE IF NOT EXISTS t_item ON COMMIT DROP AS ' + || 'WITH RECURSIVE base AS ( ' + || ' (SELECT "Item".*, 1 as level, ROW_NUMBER() OVER () as rn ' + || ' FROM "Item" ' + || ' WHERE "Item"."parentId" = $1 ' + || ' AND "Item"."isScheduled" = false ' + || _order_by || ' ' + || ' LIMIT $2 ' + || ' OFFSET $3) ' + || ' UNION ALL ' + || ' (SELECT "Item".*, b.level + 1, ROW_NUMBER() OVER (PARTITION BY "Item"."parentId" ' || _order_by || ') as rn ' + || ' FROM "Item" ' + || ' JOIN base b ON "Item"."parentId" = b.id ' + || ' WHERE b.level < $5 AND (b.level = 1 OR b.rn <= $4) ' + || ' AND "Item"."isScheduled" = false) ' + || ') ' + || 'SELECT "Item".*, ' + || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' + || ' "Item".updated_at at time zone ''UTC'' AS "updatedAt", ' + || ' "Item"."invoicePaidAt" at time zone ''UTC'' AS "invoicePaidAtUTC", ' + || ' to_jsonb(users.*) AS user ' + || 'FROM base "Item" ' + || 'JOIN users ON users.id = "Item"."userId" ' + || 'WHERE ("Item".level = 1 OR "Item".rn <= $4 - "Item".level + 2) ' || _where || ' ' + USING _item_id, _limit, _offset, _grandchild_limit, _level, _where, _order_by; + + EXECUTE '' + || 'SELECT COALESCE(jsonb_agg(sub), ''[]''::jsonb) AS comments ' + || 'FROM ( ' + || ' SELECT "Item".*, item_comments_limited("Item".id, $2, $3, $4, $5 - 1, $6, $7) AS comments ' + || ' FROM t_item "Item" ' + || ' WHERE "Item"."parentId" = $1 ' + || _order_by + || ' ) sub ' + INTO result + USING _item_id, _limit, _offset, _grandchild_limit, _level, _where, _order_by; + + RETURN result; +END +$$; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index cb92eab87..9f355ff20 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -563,6 +563,8 @@ model Item { bio Boolean @default(false) freebie Boolean @default(false) deletedAt DateTime? + scheduledAt DateTime? + isScheduled Boolean @default(false) otsFile Bytes? otsHash String? imgproxyUrls Json? @@ -629,6 +631,9 @@ model Item { @@index([url]) @@index([boost]) @@index([invoicePaidAt]) + @@index([scheduledAt]) + @@index([isScheduled]) + @@index([isScheduled, scheduledAt]) } // we use this to denormalize a user's aggregated interactions (zaps) with an item diff --git a/worker/index.js b/worker/index.js index 03b1a3f4c..d5e674956 100644 --- a/worker/index.js +++ b/worker/index.js @@ -38,6 +38,7 @@ import { expireBoost } from './expireBoost' import { payingActionConfirmed, payingActionFailed } from './payingAction' import { autoDropBolt11s } from './autoDropBolt11' import { postToSocial } from './socialPoster' +import { publishScheduledPost, schedulePostSideEffects } from './publishScheduledPosts' // WebSocket polyfill import ws from 'isomorphic-ws' @@ -144,6 +145,8 @@ async function work () { await boss.work('reminder', jobWrapper(remindUser)) await boss.work('thisDay', jobWrapper(thisDay)) await boss.work('socialPoster', jobWrapper(postToSocial)) + await boss.work('publishScheduledPost', jobWrapper(publishScheduledPost)) + await boss.work('schedulePostSideEffects', jobWrapper(schedulePostSideEffects)) await boss.work('checkWallet', jobWrapper(checkWallet)) console.log('working jobs') diff --git a/worker/publishScheduledPosts.js b/worker/publishScheduledPosts.js new file mode 100644 index 000000000..101cf2560 --- /dev/null +++ b/worker/publishScheduledPosts.js @@ -0,0 +1,77 @@ +import { notifyItemMention, notifyItemParents, notifyMention, notifyTerritorySubscribers, notifyUserSubscribers, notifyThreadSubscribers } from '@/lib/webPush' + +export async function publishScheduledPost ({ data: { itemId }, models, lnd }) { + console.log('publishing scheduled post', itemId) + + const item = await models.item.findUnique({ + where: { id: itemId }, + include: { + user: true, + mentions: true, + itemReferrers: { include: { refereeItem: true } } + } + }) + + if (!item || !item.isScheduled || item.deletedAt) { + console.log('item not found, not scheduled, or deleted', itemId) + return + } + + const publishTime = new Date() + + // Update the item to be published with new timestamp + await models.item.update({ + where: { id: itemId }, + data: { + isScheduled: false, + scheduledAt: null, + createdAt: publishTime, + updatedAt: publishTime + } + }) + + // Refresh any cached views or materialized data that might reference this item + await models.$executeRaw`REFRESH MATERIALIZED VIEW CONCURRENTLY hot_score_view` + + // Queue side effects + await models.$executeRaw` + INSERT INTO pgboss.job (name, data, startafter) + VALUES ('schedulePostSideEffects', jsonb_build_object('itemId', ${itemId}::INTEGER), now())` + + console.log('published scheduled post with new timestamp', itemId, publishTime) +} + +export async function schedulePostSideEffects ({ data: { itemId }, models }) { + const item = await models.item.findFirst({ + where: { id: itemId }, + include: { + mentions: true, + itemReferrers: { include: { refereeItem: true } }, + user: true + } + }) + + if (!item) { + console.log('item not found for side effects', itemId) + return + } + + // Send notifications for scheduled posts that are now published + if (item.parentId) { + notifyItemParents({ item, models }).catch(console.error) + notifyThreadSubscribers({ models, item }).catch(console.error) + } + + for (const { userId } of item.mentions) { + notifyMention({ models, item, userId }).catch(console.error) + } + + for (const { refereeItem } of item.itemReferrers) { + notifyItemMention({ models, referrerItem: item, refereeItem }).catch(console.error) + } + + notifyUserSubscribers({ models, item }).catch(console.error) + notifyTerritorySubscribers({ models, item }).catch(console.error) + + console.log('completed side effects for scheduled post', itemId) +} From b97ca2e863092a8681e661000a7e920c0418c4c8 Mon Sep 17 00:00:00 2001 From: m0wer Date: Fri, 13 Jun 2025 22:12:57 +0200 Subject: [PATCH 02/10] refactor: remove isScheduled field, use only scheduledAt Database Schema Changes - Removed isScheduled boolean field from Item model - Kept only scheduledAt timestamp field - Removed unnecessary indexes for isScheduled - Updated stored functions to use scheduledAt IS NULL instead of isScheduled = false API Changes - Updated scheduledOrMine() function to check scheduledAt IS NULL - Updated scheduled items query to use scheduledAt IS NOT NULL - Updated mutations to only set/unset scheduledAt field - Kept isScheduled as computed GraphQL field that returns !!item.scheduledAt Item Creation Logic - Removed isScheduled variable and logic - Only set scheduledAt field during item creation and updates Worker Functions - Updated to check !item.scheduledAt instead of !item.isScheduled - Only update scheduledAt field when publishing/canceling The refactoring maintains the same functionality while eliminating the redundant boolean field. The isScheduled GraphQL field remains available for frontend convenience as a computed property. --- api/paidAction/itemCreate.js | 2 -- api/paidAction/lib/item.js | 3 +-- api/resolvers/item.js | 14 ++++++-------- .../migration.sql | 19 ++++++++----------- prisma/schema.prisma | 3 --- worker/publishScheduledPosts.js | 3 +-- 6 files changed, 16 insertions(+), 28 deletions(-) diff --git a/api/paidAction/itemCreate.js b/api/paidAction/itemCreate.js index fb76fce8c..4f8f554f2 100644 --- a/api/paidAction/itemCreate.js +++ b/api/paidAction/itemCreate.js @@ -99,7 +99,6 @@ export async function perform (args, context) { // Check if this is a scheduled post const scheduleAt = getScheduleAt(data.text) - const isScheduled = !!scheduleAt // start with median vote if (me) { @@ -117,7 +116,6 @@ export async function perform (args, context) { ...data, ...invoiceData, boost, - isScheduled, scheduledAt: scheduleAt, threadSubscriptions: { createMany: { diff --git a/api/paidAction/lib/item.js b/api/paidAction/lib/item.js index 908b15458..6b1b76a46 100644 --- a/api/paidAction/lib/item.js +++ b/api/paidAction/lib/item.js @@ -96,11 +96,10 @@ export async function performBotBehavior ({ text, id }, { me, tx }) { // For new items, scheduling info is set during creation // For updates, we need to update the item const existingItem = await tx.item.findUnique({ where: { id: Number(id) } }) - if (existingItem && !existingItem.isScheduled) { + if (existingItem && !existingItem.scheduledAt) { await tx.item.update({ where: { id: Number(id) }, data: { - isScheduled: true, scheduledAt: scheduleAt } }) diff --git a/api/resolvers/item.js b/api/resolvers/item.js index eab17c5eb..bd46fe327 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -260,8 +260,8 @@ export const muteClause = me => export const scheduledOrMine = (me) => { return me - ? `("Item"."isScheduled" = false OR "Item"."userId" = ${me.id})` - : '"Item"."isScheduled" = false' + ? `("Item"."scheduledAt" IS NULL OR "Item"."userId" = ${me.id})` + : '"Item"."scheduledAt" IS NULL' } const HIDE_NSFW_CLAUSE = '("Sub"."nsfw" = FALSE OR "Sub"."nsfw" IS NULL)' @@ -764,7 +764,7 @@ export default { ${SELECT}, trim(both ' ' from coalesce(ltree2text(subpath("path", 0, -1)), '')) AS "ancestorTitles" FROM "Item" - WHERE "userId" = $1 AND "isScheduled" = true AND "deletedAt" IS NULL + WHERE "userId" = $1 AND "scheduledAt" IS NOT NULL AND "deletedAt" IS NULL AND ("invoiceActionState" IS NULL OR "invoiceActionState" = 'PAID') AND created_at <= $2::timestamp ORDER BY "scheduledAt" ASC @@ -1112,7 +1112,7 @@ export default { throw new GqlInputError('item does not belong to you') } - if (!item.isScheduled) { + if (!item.scheduledAt) { throw new GqlInputError('item is not scheduled') } @@ -1127,7 +1127,6 @@ export default { return await models.item.update({ where: { id: Number(id) }, data: { - isScheduled: false, scheduledAt: null } }) @@ -1149,7 +1148,7 @@ export default { throw new GqlInputError('item does not belong to you') } - if (!item.isScheduled) { + if (!item.scheduledAt) { throw new GqlInputError('item is not scheduled') } @@ -1166,7 +1165,6 @@ export default { const updatedItem = await models.item.update({ where: { id: Number(id) }, data: { - isScheduled: false, scheduledAt: null, createdAt: publishTime, updatedAt: publishTime @@ -1561,7 +1559,7 @@ export default { return item.scheduledAt }, isScheduled: async (item, args, { me, models }) => { - return !!item.isScheduled + return !!item.scheduledAt } } } diff --git a/prisma/migrations/20250611000000_scheduled_posts/migration.sql b/prisma/migrations/20250611000000_scheduled_posts/migration.sql index de2122237..40a7d53f2 100644 --- a/prisma/migrations/20250611000000_scheduled_posts/migration.sql +++ b/prisma/migrations/20250611000000_scheduled_posts/migration.sql @@ -1,11 +1,8 @@ --- Add fields for scheduled posts to the Item model +-- Add field for scheduled posts to the Item model ALTER TABLE "Item" ADD COLUMN "scheduledAt" TIMESTAMP(3); -ALTER TABLE "Item" ADD COLUMN "isScheduled" BOOLEAN NOT NULL DEFAULT FALSE; --- Create indexes for efficient querying of scheduled posts +-- Create index for efficient querying of scheduled posts CREATE INDEX "Item_scheduledAt_idx" ON "Item"("scheduledAt"); -CREATE INDEX "Item_isScheduled_idx" ON "Item"("isScheduled"); -CREATE INDEX "Item_isScheduled_scheduledAt_idx" ON "Item"("isScheduled", "scheduledAt"); -- Update stored functions to filter out scheduled posts from comments for non-owners @@ -28,7 +25,7 @@ BEGIN || ' FROM "Item" ' || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."userId" = $3) ' || _order_by || ' ' || ' LIMIT $4 ' || ' OFFSET $5) ' @@ -108,7 +105,7 @@ BEGIN || ' FROM "Item" ' || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."userId" = $3) ' || _order_by || ') ' || ' UNION ALL ' || ' (SELECT "Item".*, b.level + 1 ' @@ -185,14 +182,14 @@ BEGIN || ' (SELECT "Item".*, 1 as level ' || ' FROM "Item" ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND "Item"."isScheduled" = false ' + || ' AND "Item"."scheduledAt" IS NULL ' || _order_by || ') ' || ' UNION ALL ' || ' (SELECT "Item".*, b.level + 1 ' || ' FROM "Item" ' || ' JOIN base b ON "Item"."parentId" = b.id ' || ' WHERE b.level < $2 ' - || ' AND "Item"."isScheduled" = false) ' + || ' AND "Item"."scheduledAt" IS NULL) ' || ') ' || 'SELECT "Item".*, ' || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' @@ -237,7 +234,7 @@ BEGIN || ' (SELECT "Item".*, 1 as level, ROW_NUMBER() OVER () as rn ' || ' FROM "Item" ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND "Item"."isScheduled" = false ' + || ' AND "Item"."scheduledAt" IS NULL ' || _order_by || ' ' || ' LIMIT $2 ' || ' OFFSET $3) ' @@ -246,7 +243,7 @@ BEGIN || ' FROM "Item" ' || ' JOIN base b ON "Item"."parentId" = b.id ' || ' WHERE b.level < $5 AND (b.level = 1 OR b.rn <= $4) ' - || ' AND "Item"."isScheduled" = false) ' + || ' AND "Item"."scheduledAt" IS NULL) ' || ') ' || 'SELECT "Item".*, ' || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 9f355ff20..32dbd8088 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -564,7 +564,6 @@ model Item { freebie Boolean @default(false) deletedAt DateTime? scheduledAt DateTime? - isScheduled Boolean @default(false) otsFile Bytes? otsHash String? imgproxyUrls Json? @@ -632,8 +631,6 @@ model Item { @@index([boost]) @@index([invoicePaidAt]) @@index([scheduledAt]) - @@index([isScheduled]) - @@index([isScheduled, scheduledAt]) } // we use this to denormalize a user's aggregated interactions (zaps) with an item diff --git a/worker/publishScheduledPosts.js b/worker/publishScheduledPosts.js index 101cf2560..01a2f3989 100644 --- a/worker/publishScheduledPosts.js +++ b/worker/publishScheduledPosts.js @@ -12,7 +12,7 @@ export async function publishScheduledPost ({ data: { itemId }, models, lnd }) { } }) - if (!item || !item.isScheduled || item.deletedAt) { + if (!item || !item.scheduledAt || item.deletedAt) { console.log('item not found, not scheduled, or deleted', itemId) return } @@ -23,7 +23,6 @@ export async function publishScheduledPost ({ data: { itemId }, models, lnd }) { await models.item.update({ where: { id: itemId }, data: { - isScheduled: false, scheduledAt: null, createdAt: publishTime, updatedAt: publishTime From 7207cbf59ca3304bc9b7116e303ac19fef4f73e7 Mon Sep 17 00:00:00 2001 From: m0wer Date: Tue, 17 Jun 2025 22:15:20 +0200 Subject: [PATCH 03/10] feat: coalesce scheduledAt for item timestamps --- api/resolvers/item.js | 6 +++--- components/item-info.js | 4 ++-- components/item-job.js | 4 ++-- fragments/items.js | 1 + lib/rss.js | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/api/resolvers/item.js b/api/resolvers/item.js index bd46fe327..aa5dd5486 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -39,7 +39,7 @@ function commentsOrderByClause (me, models, sort) { if (sort === 'recent') { return `ORDER BY ${sharedSorts}, ("Item".cost > 0 OR "Item"."weightedVotes" - "Item"."weightedDownVotes" > 0) DESC, - COALESCE("Item"."invoicePaidAt", "Item".created_at) DESC, "Item".id DESC` + COALESCE("Item"."scheduledAt", "Item"."invoicePaidAt", "Item".created_at) DESC, "Item".id DESC` } if (sort === 'hot') { @@ -447,10 +447,10 @@ export default { typeClause(type), muteClause(me) )} - ORDER BY COALESCE("Item"."invoicePaidAt", "Item".created_at) DESC + ORDER BY COALESCE("Item"."scheduledAt", "Item"."invoicePaidAt", "Item".created_at) DESC OFFSET $2 LIMIT $3`, - orderBy: 'ORDER BY COALESCE("Item"."invoicePaidAt", "Item".created_at) DESC' + orderBy: 'ORDER BY COALESCE("Item"."scheduledAt", "Item"."invoicePaidAt", "Item".created_at) DESC' }, decodedCursor.time, decodedCursor.offset, limit, ...subArr) break case 'top': diff --git a/components/item-info.js b/components/item-info.js index ec0a09582..a29d8e2ec 100644 --- a/components/item-info.js +++ b/components/item-info.js @@ -138,8 +138,8 @@ export default function ItemInfo ({ {embellishUser} } - - {timeSince(new Date(item.invoicePaidAt || item.createdAt))} + + {timeSince(new Date(item.scheduledAt || item.invoicePaidAt || item.createdAt))} {item.prior && <> diff --git a/components/item-job.js b/components/item-job.js index 17d2c7278..7035f5254 100644 --- a/components/item-job.js +++ b/components/item-job.js @@ -59,8 +59,8 @@ export default function ItemJob ({ item, toc, rank, children, disableRetry, setD @{item.user.name} - - {timeSince(new Date(item.createdAt))} + + {timeSince(new Date(item.scheduledAt || item.invoicePaidAt || item.createdAt))} {item.subName && diff --git a/fragments/items.js b/fragments/items.js index 151587a20..644eb8893 100644 --- a/fragments/items.js +++ b/fragments/items.js @@ -19,6 +19,7 @@ export const ITEM_FIELDS = gql` parentId createdAt invoicePaidAt + scheduledAt deletedAt title url diff --git a/lib/rss.js b/lib/rss.js index 747d1eafb..ff69e4a7c 100644 --- a/lib/rss.js +++ b/lib/rss.js @@ -31,7 +31,7 @@ const generateRssItem = (item) => { ${escapeXml(link)} ${guid} Comments]]> - ${new Date(item.createdAt).toUTCString()} + ${new Date(item.scheduledAt || item.invoicePaidAt || item.createdAt).toUTCString()} ${category} ${item.user.name} From c9a1283d1876654e8498c16d522de0467db3b813 Mon Sep 17 00:00:00 2001 From: m0wer Date: Tue, 17 Jun 2025 23:06:07 +0200 Subject: [PATCH 04/10] chore: cleanup: remove cancelScheduledPost mutation --- api/resolvers/item.js | 36 ------------------------------------ api/typeDefs/item.js | 1 - 2 files changed, 37 deletions(-) diff --git a/api/resolvers/item.js b/api/resolvers/item.js index aa5dd5486..6d4dd6f48 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -1095,42 +1095,6 @@ export default { return result }, - cancelScheduledPost: async (parent, { id }, { me, models }) => { - if (!me) { - throw new GqlAuthenticationError() - } - - const item = await models.item.findUnique({ - where: { id: Number(id) } - }) - - if (!item) { - throw new GqlInputError('item not found') - } - - if (Number(item.userId) !== Number(me.id)) { - throw new GqlInputError('item does not belong to you') - } - - if (!item.scheduledAt) { - throw new GqlInputError('item is not scheduled') - } - - // Cancel the scheduled job - await models.$queryRaw` - DELETE FROM pgboss.job - WHERE name = 'publishScheduledPost' - AND data->>'itemId' = ${item.id}::TEXT - AND state <> 'completed'` - - // Update the item to remove scheduling - return await models.item.update({ - where: { id: Number(id) }, - data: { - scheduledAt: null - } - }) - }, publishScheduledPostNow: async (parent, { id }, { me, models }) => { if (!me) { throw new GqlAuthenticationError() diff --git a/api/typeDefs/item.js b/api/typeDefs/item.js index e58a800c6..85523d47e 100644 --- a/api/typeDefs/item.js +++ b/api/typeDefs/item.js @@ -64,7 +64,6 @@ export default gql` act(id: ID!, sats: Int, act: String, hasSendWallet: Boolean): ItemActPaidAction! pollVote(id: ID!): PollVotePaidAction! toggleOutlaw(id: ID!): Item! - cancelScheduledPost(id: ID!): Item! publishScheduledPostNow(id: ID!): Item! } From 51f9ab4c95e7e885e5d914cc4953c7d83c50a071 Mon Sep 17 00:00:00 2001 From: m0wer Date: Mon, 23 Jun 2025 21:07:12 +0200 Subject: [PATCH 05/10] fix: rethink logic for scheduled posts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This makes things cleaner and shows the right “time since” for scheduled posts. --- api/resolvers/item.js | 8 +++----- lib/apollo.js | 6 ++++++ worker/publishScheduledPosts.js | 4 +--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/api/resolvers/item.js b/api/resolvers/item.js index 6d4dd6f48..79492e2da 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -260,8 +260,8 @@ export const muteClause = me => export const scheduledOrMine = (me) => { return me - ? `("Item"."scheduledAt" IS NULL OR "Item"."userId" = ${me.id})` - : '"Item"."scheduledAt" IS NULL' + ? `("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= now() OR "Item"."userId" = ${me.id})` + : '("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= now())' } const HIDE_NSFW_CLAUSE = '("Sub"."nsfw" = FALSE OR "Sub"."nsfw" IS NULL)' @@ -1125,12 +1125,10 @@ export default { const publishTime = new Date() - // Publish immediately with current timestamp + // Publish immediately - keep original createdAt and scheduledAt const updatedItem = await models.item.update({ where: { id: Number(id) }, data: { - scheduledAt: null, - createdAt: publishTime, updatedAt: publishTime } }) diff --git a/lib/apollo.js b/lib/apollo.js index 3739ba3fd..115679c67 100644 --- a/lib/apollo.js +++ b/lib/apollo.js @@ -332,6 +332,12 @@ function getClient (uri) { return reactiveVar() } + }, + // Ensure scheduled post state changes are properly reflected in cache + scheduledAt: { + merge (existing, incoming) { + return incoming + } } } } diff --git a/worker/publishScheduledPosts.js b/worker/publishScheduledPosts.js index 01a2f3989..923aa756f 100644 --- a/worker/publishScheduledPosts.js +++ b/worker/publishScheduledPosts.js @@ -19,12 +19,10 @@ export async function publishScheduledPost ({ data: { itemId }, models, lnd }) { const publishTime = new Date() - // Update the item to be published with new timestamp + // Update the item to be published - keep original createdAt and scheduledAt await models.item.update({ where: { id: itemId }, data: { - scheduledAt: null, - createdAt: publishTime, updatedAt: publishTime } }) From 318a8c242e361559ab1b3ac4ccb266f1064341f5 Mon Sep 17 00:00:00 2001 From: m0wer Date: Mon, 23 Jun 2025 21:51:25 +0200 Subject: [PATCH 06/10] chore: remove publishScheduledPost job and related logic --- api/paidAction/lib/item.js | 36 ++-------------- api/resolvers/item.js | 48 --------------------- api/typeDefs/item.js | 1 - worker/index.js | 3 -- worker/publishScheduledPosts.js | 74 --------------------------------- 5 files changed, 4 insertions(+), 158 deletions(-) delete mode 100644 worker/publishScheduledPosts.js diff --git a/api/paidAction/lib/item.js b/api/paidAction/lib/item.js index 6b1b76a46..fc8a5bc02 100644 --- a/api/paidAction/lib/item.js +++ b/api/paidAction/lib/item.js @@ -1,5 +1,5 @@ import { USER_ID } from '@/lib/constants' -import { deleteReminders, getDeleteAt, getRemindAt, getScheduleAt } from '@/lib/item' +import { deleteReminders, getDeleteAt, getRemindAt } from '@/lib/item' import { parseInternalLinks } from '@/lib/url' export async function getMentions ({ text }, { me, tx }) { @@ -46,19 +46,15 @@ export const getItemMentions = async ({ text }, { me, tx }) => { } export async function performBotBehavior ({ text, id }, { me, tx }) { - // delete any existing deleteItem, reminder, or publishScheduledPost jobs for this item const userId = me?.id || USER_ID.anon id = Number(id) + await tx.$queryRaw` DELETE FROM pgboss.job - WHERE (name = 'deleteItem' OR name = 'publishScheduledPost') + WHERE name = 'deleteItem' AND data->>'id' = ${id}::TEXT AND state <> 'completed'` - await tx.$queryRaw` - DELETE FROM pgboss.job - WHERE name = 'publishScheduledPost' - AND data->>'itemId' = ${id}::TEXT - AND state <> 'completed'` + await deleteReminders({ id, userId, models: tx }) if (text) { @@ -90,29 +86,5 @@ export async function performBotBehavior ({ text, id }, { me, tx }) { } }) } - - const scheduleAt = getScheduleAt(text) - if (scheduleAt) { - // For new items, scheduling info is set during creation - // For updates, we need to update the item - const existingItem = await tx.item.findUnique({ where: { id: Number(id) } }) - if (existingItem && !existingItem.scheduledAt) { - await tx.item.update({ - where: { id: Number(id) }, - data: { - scheduledAt: scheduleAt - } - }) - } - - // Schedule the job to publish the post - await tx.$queryRaw` - INSERT INTO pgboss.job (name, data, startafter, keepuntil) - VALUES ( - 'publishScheduledPost', - jsonb_build_object('itemId', ${id}::INTEGER), - ${scheduleAt}::TIMESTAMP WITH TIME ZONE, - ${scheduleAt}::TIMESTAMP WITH TIME ZONE + interval '1 minute')` - } } } diff --git a/api/resolvers/item.js b/api/resolvers/item.js index 79492e2da..0204da418 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -1095,54 +1095,6 @@ export default { return result }, - publishScheduledPostNow: async (parent, { id }, { me, models }) => { - if (!me) { - throw new GqlAuthenticationError() - } - - const item = await models.item.findUnique({ - where: { id: Number(id) } - }) - - if (!item) { - throw new GqlInputError('item not found') - } - - if (Number(item.userId) !== Number(me.id)) { - throw new GqlInputError('item does not belong to you') - } - - if (!item.scheduledAt) { - throw new GqlInputError('item is not scheduled') - } - - // Cancel the scheduled job - await models.$queryRaw` - DELETE FROM pgboss.job - WHERE name = 'publishScheduledPost' - AND data->>'itemId' = ${item.id}::TEXT - AND state <> 'completed'` - - const publishTime = new Date() - - // Publish immediately - keep original createdAt and scheduledAt - const updatedItem = await models.item.update({ - where: { id: Number(id) }, - data: { - updatedAt: publishTime - } - }) - - // Refresh cached views - await models.$executeRaw`REFRESH MATERIALIZED VIEW CONCURRENTLY hot_score_view` - - // Queue side effects - await models.$executeRaw` - INSERT INTO pgboss.job (name, data, startafter) - VALUES ('schedulePostSideEffects', jsonb_build_object('itemId', ${item.id}::INTEGER), now())` - - return updatedItem - } }, ItemAct: { invoice: async (itemAct, args, { models }) => { diff --git a/api/typeDefs/item.js b/api/typeDefs/item.js index 85523d47e..bbc770302 100644 --- a/api/typeDefs/item.js +++ b/api/typeDefs/item.js @@ -64,7 +64,6 @@ export default gql` act(id: ID!, sats: Int, act: String, hasSendWallet: Boolean): ItemActPaidAction! pollVote(id: ID!): PollVotePaidAction! toggleOutlaw(id: ID!): Item! - publishScheduledPostNow(id: ID!): Item! } type PollVoteResult { diff --git a/worker/index.js b/worker/index.js index d5e674956..03b1a3f4c 100644 --- a/worker/index.js +++ b/worker/index.js @@ -38,7 +38,6 @@ import { expireBoost } from './expireBoost' import { payingActionConfirmed, payingActionFailed } from './payingAction' import { autoDropBolt11s } from './autoDropBolt11' import { postToSocial } from './socialPoster' -import { publishScheduledPost, schedulePostSideEffects } from './publishScheduledPosts' // WebSocket polyfill import ws from 'isomorphic-ws' @@ -145,8 +144,6 @@ async function work () { await boss.work('reminder', jobWrapper(remindUser)) await boss.work('thisDay', jobWrapper(thisDay)) await boss.work('socialPoster', jobWrapper(postToSocial)) - await boss.work('publishScheduledPost', jobWrapper(publishScheduledPost)) - await boss.work('schedulePostSideEffects', jobWrapper(schedulePostSideEffects)) await boss.work('checkWallet', jobWrapper(checkWallet)) console.log('working jobs') diff --git a/worker/publishScheduledPosts.js b/worker/publishScheduledPosts.js deleted file mode 100644 index 923aa756f..000000000 --- a/worker/publishScheduledPosts.js +++ /dev/null @@ -1,74 +0,0 @@ -import { notifyItemMention, notifyItemParents, notifyMention, notifyTerritorySubscribers, notifyUserSubscribers, notifyThreadSubscribers } from '@/lib/webPush' - -export async function publishScheduledPost ({ data: { itemId }, models, lnd }) { - console.log('publishing scheduled post', itemId) - - const item = await models.item.findUnique({ - where: { id: itemId }, - include: { - user: true, - mentions: true, - itemReferrers: { include: { refereeItem: true } } - } - }) - - if (!item || !item.scheduledAt || item.deletedAt) { - console.log('item not found, not scheduled, or deleted', itemId) - return - } - - const publishTime = new Date() - - // Update the item to be published - keep original createdAt and scheduledAt - await models.item.update({ - where: { id: itemId }, - data: { - updatedAt: publishTime - } - }) - - // Refresh any cached views or materialized data that might reference this item - await models.$executeRaw`REFRESH MATERIALIZED VIEW CONCURRENTLY hot_score_view` - - // Queue side effects - await models.$executeRaw` - INSERT INTO pgboss.job (name, data, startafter) - VALUES ('schedulePostSideEffects', jsonb_build_object('itemId', ${itemId}::INTEGER), now())` - - console.log('published scheduled post with new timestamp', itemId, publishTime) -} - -export async function schedulePostSideEffects ({ data: { itemId }, models }) { - const item = await models.item.findFirst({ - where: { id: itemId }, - include: { - mentions: true, - itemReferrers: { include: { refereeItem: true } }, - user: true - } - }) - - if (!item) { - console.log('item not found for side effects', itemId) - return - } - - // Send notifications for scheduled posts that are now published - if (item.parentId) { - notifyItemParents({ item, models }).catch(console.error) - notifyThreadSubscribers({ models, item }).catch(console.error) - } - - for (const { userId } of item.mentions) { - notifyMention({ models, item, userId }).catch(console.error) - } - - for (const { refereeItem } of item.itemReferrers) { - notifyItemMention({ models, referrerItem: item, refereeItem }).catch(console.error) - } - - notifyUserSubscribers({ models, item }).catch(console.error) - notifyTerritorySubscribers({ models, item }).catch(console.error) - - console.log('completed side effects for scheduled post', itemId) -} From 2aaa94948b3fadd1369623b0986aa2a064d6a741 Mon Sep 17 00:00:00 2001 From: m0wer Date: Mon, 23 Jun 2025 21:58:21 +0200 Subject: [PATCH 07/10] chore: undo formatting changes --- api/paidAction/lib/item.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/paidAction/lib/item.js b/api/paidAction/lib/item.js index fc8a5bc02..879b1cb53 100644 --- a/api/paidAction/lib/item.js +++ b/api/paidAction/lib/item.js @@ -46,15 +46,14 @@ export const getItemMentions = async ({ text }, { me, tx }) => { } export async function performBotBehavior ({ text, id }, { me, tx }) { + // delete any existing deleteItem or reminder jobs for this item const userId = me?.id || USER_ID.anon id = Number(id) - await tx.$queryRaw` DELETE FROM pgboss.job WHERE name = 'deleteItem' AND data->>'id' = ${id}::TEXT AND state <> 'completed'` - await deleteReminders({ id, userId, models: tx }) if (text) { From 6d95cba11c611dab453be8761553211bd6123bdc Mon Sep 17 00:00:00 2001 From: m0wer Date: Mon, 23 Jun 2025 22:01:31 +0200 Subject: [PATCH 08/10] style: fix lint --- api/resolvers/item.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/resolvers/item.js b/api/resolvers/item.js index 0204da418..1e9545323 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -1094,7 +1094,7 @@ export default { ]) return result - }, + } }, ItemAct: { invoice: async (itemAct, args, { models }) => { From 9dc9311ba461418574136e1f280c6c4dd5f02228 Mon Sep 17 00:00:00 2001 From: m0wer Date: Mon, 23 Jun 2025 22:09:39 +0200 Subject: [PATCH 09/10] fix: migration issues --- .../20250611000000_scheduled_posts/migration.sql | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/prisma/migrations/20250611000000_scheduled_posts/migration.sql b/prisma/migrations/20250611000000_scheduled_posts/migration.sql index 40a7d53f2..2b5b08806 100644 --- a/prisma/migrations/20250611000000_scheduled_posts/migration.sql +++ b/prisma/migrations/20250611000000_scheduled_posts/migration.sql @@ -25,7 +25,7 @@ BEGIN || ' FROM "Item" ' || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."userId" = $3) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW() OR "Item"."userId" = $3) ' || _order_by || ' ' || ' LIMIT $4 ' || ' OFFSET $5) ' @@ -35,7 +35,7 @@ BEGIN || ' JOIN base b ON "Item"."parentId" = b.id ' || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' || ' WHERE b.level < $7 AND (b.level = 1 OR b.rn <= $6) ' - || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3)) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW() OR "Item"."userId" = $3)) ' || ') ' || 'SELECT "Item".*, ' || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' @@ -105,7 +105,7 @@ BEGIN || ' FROM "Item" ' || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."userId" = $3) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW() OR "Item"."userId" = $3) ' || _order_by || ') ' || ' UNION ALL ' || ' (SELECT "Item".*, b.level + 1 ' @@ -113,7 +113,7 @@ BEGIN || ' JOIN base b ON "Item"."parentId" = b.id ' || ' LEFT JOIN hot_score_view g(id, "hotScore", "subHotScore") ON g.id = "Item".id ' || ' WHERE b.level < $2 ' - || ' AND ("Item"."isScheduled" = false OR "Item"."userId" = $3)) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW() OR "Item"."userId" = $3)) ' || ') ' || 'SELECT "Item".*, ' || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' @@ -182,14 +182,14 @@ BEGIN || ' (SELECT "Item".*, 1 as level ' || ' FROM "Item" ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND "Item"."scheduledAt" IS NULL ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW()) ' || _order_by || ') ' || ' UNION ALL ' || ' (SELECT "Item".*, b.level + 1 ' || ' FROM "Item" ' || ' JOIN base b ON "Item"."parentId" = b.id ' || ' WHERE b.level < $2 ' - || ' AND "Item"."scheduledAt" IS NULL) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW())) ' || ') ' || 'SELECT "Item".*, ' || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' @@ -234,7 +234,7 @@ BEGIN || ' (SELECT "Item".*, 1 as level, ROW_NUMBER() OVER () as rn ' || ' FROM "Item" ' || ' WHERE "Item"."parentId" = $1 ' - || ' AND "Item"."scheduledAt" IS NULL ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW()) ' || _order_by || ' ' || ' LIMIT $2 ' || ' OFFSET $3) ' @@ -243,7 +243,7 @@ BEGIN || ' FROM "Item" ' || ' JOIN base b ON "Item"."parentId" = b.id ' || ' WHERE b.level < $5 AND (b.level = 1 OR b.rn <= $4) ' - || ' AND "Item"."scheduledAt" IS NULL) ' + || ' AND ("Item"."scheduledAt" IS NULL OR "Item"."scheduledAt" <= NOW())) ' || ') ' || 'SELECT "Item".*, ' || ' "Item".created_at at time zone ''UTC'' AS "createdAt", ' From a23ad434ffcafddc71a6744247e552ba8896cd2c Mon Sep 17 00:00:00 2001 From: m0wer Date: Mon, 23 Jun 2025 22:18:06 +0200 Subject: [PATCH 10/10] fix: show scheduledAt for published scheduled posts --- api/resolvers/item.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/resolvers/item.js b/api/resolvers/item.js index 1e9545323..15b49afce 100644 --- a/api/resolvers/item.js +++ b/api/resolvers/item.js @@ -1467,8 +1467,11 @@ export default { scheduledAt: async (item, args, { me, models }) => { const meId = me?.id ?? USER_ID.anon if (meId !== item.userId) { - // Only show scheduledAt for your own items to keep DB queries minimized - return null + // Only show scheduledAt for published scheduled posts (scheduledAt <= now) + // For unpublished scheduled posts, only show to owner + if (!item.scheduledAt || item.scheduledAt > new Date()) { + return null + } } return item.scheduledAt },