Skip to content

Commit 2f0bb01

Browse files
authored
Move stats to their own job (#1324)
1 parent d56b238 commit 2f0bb01

File tree

4 files changed

+263
-213
lines changed

4 files changed

+263
-213
lines changed

src/hooks.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ import { sha256 } from "$lib/utils/sha256";
99
import { addWeeks } from "date-fns";
1010
import { checkAndRunMigrations } from "$lib/migrations/migrations";
1111
import { building } from "$app/environment";
12-
import { refreshAssistantsCounts } from "$lib/assistantStats/refresh-assistants-counts";
1312
import { logger } from "$lib/server/logger";
1413
import { AbortedGenerations } from "$lib/server/abortedGenerations";
1514
import { MetricsServer } from "$lib/server/metrics";
1615
import { initExitHandler } from "$lib/server/exitHandler";
1716
import { ObjectId } from "mongodb";
17+
import { refreshAssistantsCounts } from "$lib/jobs/refresh-assistants-counts";
18+
import { refreshConversationStats } from "$lib/jobs/refresh-conversation-stats";
1819

1920
// TODO: move this code on a started server hook, instead of using a "building" flag
2021
if (!building) {
@@ -24,6 +25,7 @@ if (!building) {
2425
if (env.ENABLE_ASSISTANTS) {
2526
refreshAssistantsCounts();
2627
}
28+
refreshConversationStats();
2729

2830
// Init metrics server
2931
MetricsServer.getInstance();
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
import type { ConversationStats } from "$lib/types/ConversationStats";
2+
import { CONVERSATION_STATS_COLLECTION, collections } from "$lib/server/database";
3+
import { logger } from "$lib/server/logger";
4+
import type { ObjectId } from "mongodb";
5+
import { acquireLock, refreshLock } from "$lib/migrations/lock";
6+
7+
export async function computeAllStats() {
8+
for (const span of ["day", "week", "month"] as const) {
9+
computeStats({ dateField: "updatedAt", type: "conversation", span }).catch((e) =>
10+
logger.error(e)
11+
);
12+
computeStats({ dateField: "createdAt", type: "conversation", span }).catch((e) =>
13+
logger.error(e)
14+
);
15+
computeStats({ dateField: "createdAt", type: "message", span }).catch((e) => logger.error(e));
16+
}
17+
}
18+
19+
async function computeStats(params: {
20+
dateField: ConversationStats["date"]["field"];
21+
span: ConversationStats["date"]["span"];
22+
type: ConversationStats["type"];
23+
}) {
24+
const lastComputed = await collections.conversationStats.findOne(
25+
{ "date.field": params.dateField, "date.span": params.span, type: params.type },
26+
{ sort: { "date.at": -1 } }
27+
);
28+
29+
// If the last computed week is at the beginning of the last computed month, we need to include some days from the previous month
30+
// In those cases we need to compute the stats from before the last month as everything is one aggregation
31+
const minDate = lastComputed ? lastComputed.date.at : new Date(0);
32+
33+
logger.info(
34+
{ minDate, dateField: params.dateField, span: params.span, type: params.type },
35+
"Computing conversation stats"
36+
);
37+
38+
const dateField = params.type === "message" ? "messages." + params.dateField : params.dateField;
39+
40+
const pipeline = [
41+
{
42+
$match: {
43+
[dateField]: { $gte: minDate },
44+
},
45+
},
46+
{
47+
$project: {
48+
[dateField]: 1,
49+
sessionId: 1,
50+
userId: 1,
51+
},
52+
},
53+
...(params.type === "message"
54+
? [
55+
{
56+
$unwind: "$messages",
57+
},
58+
{
59+
$match: {
60+
[dateField]: { $gte: minDate },
61+
},
62+
},
63+
]
64+
: []),
65+
{
66+
$sort: {
67+
[dateField]: 1,
68+
},
69+
},
70+
{
71+
$facet: {
72+
userId: [
73+
{
74+
$match: {
75+
userId: { $exists: true },
76+
},
77+
},
78+
{
79+
$group: {
80+
_id: {
81+
at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
82+
userId: "$userId",
83+
},
84+
},
85+
},
86+
{
87+
$group: {
88+
_id: "$_id.at",
89+
count: { $sum: 1 },
90+
},
91+
},
92+
{
93+
$project: {
94+
_id: 0,
95+
date: {
96+
at: "$_id",
97+
field: params.dateField,
98+
span: params.span,
99+
},
100+
distinct: "userId",
101+
count: 1,
102+
},
103+
},
104+
],
105+
sessionId: [
106+
{
107+
$match: {
108+
sessionId: { $exists: true },
109+
},
110+
},
111+
{
112+
$group: {
113+
_id: {
114+
at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
115+
sessionId: "$sessionId",
116+
},
117+
},
118+
},
119+
{
120+
$group: {
121+
_id: "$_id.at",
122+
count: { $sum: 1 },
123+
},
124+
},
125+
{
126+
$project: {
127+
_id: 0,
128+
date: {
129+
at: "$_id",
130+
field: params.dateField,
131+
span: params.span,
132+
},
133+
distinct: "sessionId",
134+
count: 1,
135+
},
136+
},
137+
],
138+
userOrSessionId: [
139+
{
140+
$group: {
141+
_id: {
142+
at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
143+
userOrSessionId: { $ifNull: ["$userId", "$sessionId"] },
144+
},
145+
},
146+
},
147+
{
148+
$group: {
149+
_id: "$_id.at",
150+
count: { $sum: 1 },
151+
},
152+
},
153+
{
154+
$project: {
155+
_id: 0,
156+
date: {
157+
at: "$_id",
158+
field: params.dateField,
159+
span: params.span,
160+
},
161+
distinct: "userOrSessionId",
162+
count: 1,
163+
},
164+
},
165+
],
166+
_id: [
167+
{
168+
$group: {
169+
_id: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
170+
count: { $sum: 1 },
171+
},
172+
},
173+
{
174+
$project: {
175+
_id: 0,
176+
date: {
177+
at: "$_id",
178+
field: params.dateField,
179+
span: params.span,
180+
},
181+
distinct: "_id",
182+
count: 1,
183+
},
184+
},
185+
],
186+
},
187+
},
188+
{
189+
$project: {
190+
stats: {
191+
$concatArrays: ["$userId", "$sessionId", "$userOrSessionId", "$_id"],
192+
},
193+
},
194+
},
195+
{
196+
$unwind: "$stats",
197+
},
198+
{
199+
$replaceRoot: {
200+
newRoot: "$stats",
201+
},
202+
},
203+
{
204+
$set: {
205+
type: params.type,
206+
},
207+
},
208+
{
209+
$merge: {
210+
into: CONVERSATION_STATS_COLLECTION,
211+
on: ["date.at", "type", "date.span", "date.field", "distinct"],
212+
whenMatched: "replace",
213+
whenNotMatched: "insert",
214+
},
215+
},
216+
];
217+
218+
await collections.conversations.aggregate(pipeline, { allowDiskUse: true }).next();
219+
220+
logger.info(
221+
{ minDate, dateField: params.dateField, span: params.span, type: params.type },
222+
"Computed conversation stats"
223+
);
224+
}
225+
226+
const LOCK_KEY = "conversation.stats";
227+
228+
let hasLock = false;
229+
let lockId: ObjectId | null = null;
230+
231+
async function maintainLock() {
232+
if (hasLock && lockId) {
233+
hasLock = await refreshLock(LOCK_KEY, lockId);
234+
235+
if (!hasLock) {
236+
lockId = null;
237+
}
238+
} else if (!hasLock) {
239+
lockId = (await acquireLock(LOCK_KEY)) || null;
240+
hasLock = !!lockId;
241+
}
242+
243+
setTimeout(maintainLock, 10_000);
244+
}
245+
246+
export function refreshConversationStats() {
247+
const ONE_HOUR_MS = 3_600_000;
248+
249+
maintainLock().then(() => {
250+
computeAllStats();
251+
252+
setInterval(computeAllStats, 12 * ONE_HOUR_MS);
253+
});
254+
}

0 commit comments

Comments
 (0)