1
+ import { randomBytes } from "crypto" ;
1
2
import { EventEmitter } from "events" ;
3
+ import {
4
+ setupFakeTimers as realSetupFakeTimers ,
5
+ sleep ,
6
+ sleepUntil as baseSleepUntil ,
7
+ } from "jest-time-helpers" ;
2
8
import * as pg from "pg" ;
3
- import { parse } from "pg-connection-string" ;
4
9
5
10
import defer from "../src/deferred" ;
6
11
import {
@@ -16,6 +21,8 @@ import { processSharedOptions } from "../src/lib";
16
21
import { _allWorkerPools } from "../src/main" ;
17
22
import { migrate } from "../src/migrate" ;
18
23
24
+ export { DAY , HOUR , MINUTE , SECOND , sleep , WEEK } from "jest-time-helpers" ;
25
+
19
26
declare global {
20
27
namespace GraphileWorker {
21
28
interface Tasks {
@@ -25,39 +32,65 @@ declare global {
25
32
}
26
33
}
27
34
28
- export {
29
- DAY ,
30
- HOUR ,
31
- MINUTE ,
32
- SECOND ,
33
- sleep ,
34
- sleepUntil ,
35
- WEEK ,
36
- } from "jest-time-helpers" ;
37
- import {
38
- setupFakeTimers as realSetupFakeTimers ,
39
- sleepUntil ,
40
- } from "jest-time-helpers" ;
41
-
42
35
let fakeTimers : ReturnType < typeof realSetupFakeTimers > | null = null ;
43
36
export function setupFakeTimers ( ) {
44
37
fakeTimers = realSetupFakeTimers ( ) ;
45
38
return fakeTimers ;
46
39
}
47
40
41
+ export function sleepUntil ( condition : ( ) => boolean , ms ?: number ) {
42
+ // Bump the default timeout from 2000ms for CI
43
+ return baseSleepUntil ( condition , ms ?? 5000 ) ;
44
+ }
45
+
48
46
// Sometimes CI's clock can get interrupted (it is shared infra!) so this
49
47
// extends the default timeout just in case.
50
- jest . setTimeout ( 15000 ) ;
48
+ jest . setTimeout ( 20000 ) ;
51
49
52
50
// process.env.GRAPHILE_LOGGER_DEBUG = "1";
53
51
54
- export const TEST_CONNECTION_STRING =
55
- process . env . TEST_CONNECTION_STRING || "postgres:///graphile_worker_test" ;
52
+ async function createTestDatabase ( ) {
53
+ const id = randomBytes ( 8 ) . toString ( "hex" ) ;
54
+ const PGDATABASE = `graphile_worker_test_${ id } ` ;
55
+ {
56
+ const client = new pg . Client ( { connectionString : `postgres:///template1` } ) ;
57
+ await client . connect ( ) ;
58
+ await client . query (
59
+ `create database ${ pg . Client . prototype . escapeIdentifier (
60
+ PGDATABASE ,
61
+ ) } with template = graphile_worker_testtemplate;`,
62
+ ) ;
63
+ await client . end ( ) ;
64
+ }
65
+ const TEST_CONNECTION_STRING = `postgres:///${ PGDATABASE } ` ;
66
+ const PGHOST = process . env . PGHOST ;
67
+ async function release ( ) {
68
+ const client = new pg . Client ( { connectionString : `postgres:///template1` } ) ;
69
+ await client . connect ( ) ;
70
+ await client . query (
71
+ `drop database ${ pg . Client . prototype . escapeIdentifier ( PGDATABASE ) } ;` ,
72
+ ) ;
73
+ await client . end ( ) ;
74
+ }
56
75
57
- const parsed = parse ( TEST_CONNECTION_STRING ) ;
76
+ return {
77
+ TEST_CONNECTION_STRING ,
78
+ PGHOST ,
79
+ PGDATABASE ,
80
+ release,
81
+ } ;
82
+ }
83
+
84
+ export let databaseDetails : Awaited <
85
+ ReturnType < typeof createTestDatabase >
86
+ > | null = null ;
58
87
59
- export const PGHOST = parsed . host || process . env . PGHOST ;
60
- export const PGDATABASE = parsed . database || undefined ;
88
+ beforeAll ( async ( ) => {
89
+ databaseDetails = await createTestDatabase ( ) ;
90
+ } ) ;
91
+ afterAll ( async ( ) => {
92
+ databaseDetails ?. release ( ) ;
93
+ } ) ;
61
94
62
95
export const GRAPHILE_WORKER_SCHEMA =
63
96
process . env . GRAPHILE_WORKER_SCHEMA || "graphile_worker" ;
@@ -67,6 +100,7 @@ export const ESCAPED_GRAPHILE_WORKER_SCHEMA =
67
100
export async function withPgPool < T > (
68
101
cb : ( pool : pg . Pool ) => Promise < T > ,
69
102
) : Promise < T > {
103
+ const { TEST_CONNECTION_STRING } = databaseDetails ! ;
70
104
const pool = new pg . Pool ( {
71
105
connectionString : TEST_CONNECTION_STRING ,
72
106
max : 100 ,
@@ -85,12 +119,17 @@ afterEach(() => {
85
119
} ) ;
86
120
87
121
export async function withPgClient < T > (
88
- cb : ( client : pg . PoolClient ) => Promise < T > ,
122
+ cb : (
123
+ client : pg . PoolClient ,
124
+ extra : {
125
+ TEST_CONNECTION_STRING : string ;
126
+ } ,
127
+ ) => Promise < T > ,
89
128
) : Promise < T > {
90
129
return withPgPool ( async ( pool ) => {
91
130
const client = await pool . connect ( ) ;
92
131
try {
93
- return await cb ( client ) ;
132
+ return await cb ( client , databaseDetails ! ) ;
94
133
} finally {
95
134
client . release ( ) ;
96
135
}
@@ -156,7 +195,18 @@ async function _reset(
156
195
}
157
196
}
158
197
159
- export async function jobCount (
198
+ /**
199
+ * Counts the number of jobs currently in DB.
200
+ *
201
+ * If you have a pool, you may hit race conditions with this method, instead
202
+ * use `expectJobCount()` which will try multiple times to give time for
203
+ * multiple clients to synchronize.
204
+ */
205
+ export async function jobCount ( pgClient : pg . PoolClient ) : Promise < number > {
206
+ return _jobCount ( pgClient ) ;
207
+ }
208
+
209
+ async function _jobCount (
160
210
pgPoolOrClient : pg . Pool | pg . PoolClient ,
161
211
) : Promise < number > {
162
212
const {
@@ -327,3 +377,28 @@ export function withOptions<T>(
327
377
} ) ,
328
378
) ;
329
379
}
380
+
381
+ /**
382
+ * Wait for the job count to match the expected count, handles
383
+ * issues with different connections to the database not
384
+ * reflecting the same data by retrying.
385
+ */
386
+ export async function expectJobCount (
387
+ // NOTE: if you have a pgClient then you shouldn't need to
388
+ // use this - just call `jobCount()` directly since you're
389
+ // in the same client
390
+ pool : pg . Pool ,
391
+ expectedCount : number ,
392
+ ) {
393
+ let count : number = Infinity ;
394
+ for ( let i = 0 ; i < 8 ; i ++ ) {
395
+ if ( i > 0 ) {
396
+ await sleep ( i * 50 ) ;
397
+ }
398
+ count = await _jobCount ( pool ) ;
399
+ if ( count === expectedCount ) {
400
+ break ;
401
+ }
402
+ }
403
+ expect ( count ) . toEqual ( expectedCount ) ;
404
+ }
0 commit comments