1
1
import os from 'os'
2
2
import fs from 'fs'
3
- import Stream from 'stream'
4
3
5
4
import {
6
5
mergeUserTypes ,
@@ -21,6 +20,7 @@ import { Query, CLOSE } from './query.js'
21
20
import Queue from './queue.js'
22
21
import { Errors , PostgresError } from './errors.js'
23
22
import Subscribe from './subscribe.js'
23
+ import largeObject from './large.js'
24
24
25
25
Object . assign ( Postgres , {
26
26
PostgresError,
@@ -56,7 +56,7 @@ function Postgres(a, b) {
56
56
57
57
Object . assign ( sql , {
58
58
get parameters ( ) { return options . parameters } ,
59
- largeObject,
59
+ largeObject : largeObject . bind ( null , sql ) ,
60
60
subscribe,
61
61
CLOSE ,
62
62
END : CLOSE ,
@@ -246,75 +246,6 @@ function Postgres(a, b) {
246
246
}
247
247
}
248
248
249
- function largeObject ( oid , mode = 0x00020000 | 0x00040000 ) {
250
- return new Promise ( async ( resolve , reject ) => {
251
- await sql . begin ( async sql => {
252
- let finish
253
- ! oid && ( [ { oid } ] = await sql `select lo_creat(-1) as oid` )
254
- const [ { fd } ] = await sql `select lo_open(${ oid } , ${ mode } ) as fd`
255
-
256
- const lo = {
257
- writable,
258
- readable,
259
- close : ( ) => sql `select lo_close(${ fd } )` . then ( finish ) ,
260
- tell : ( ) => sql `select lo_tell64(${ fd } )` ,
261
- read : ( x ) => sql `select loread(${ fd } , ${ x } ) as data` ,
262
- write : ( x ) => sql `select lowrite(${ fd } , ${ x } )` ,
263
- truncate : ( x ) => sql `select lo_truncate64(${ fd } , ${ x } )` ,
264
- seek : ( x , whence = 0 ) => sql `select lo_lseek64(${ fd } , ${ x } , ${ whence } )` ,
265
- size : ( ) => sql `
266
- select
267
- lo_lseek64(${ fd } , location, 0) as position,
268
- seek.size
269
- from (
270
- select
271
- lo_lseek64($1, 0, 2) as size,
272
- tell.location
273
- from (select lo_tell64($1) as location) tell
274
- ) seek
275
- `
276
- }
277
-
278
- resolve ( lo )
279
-
280
- return new Promise ( async r => finish = r )
281
-
282
- async function readable ( {
283
- highWaterMark = 2048 * 8 ,
284
- start = 0 ,
285
- end = Infinity
286
- } = { } ) {
287
- let max = end - start
288
- start && await lo . seek ( start )
289
- return new Stream . Readable ( {
290
- highWaterMark,
291
- async read ( size ) {
292
- const l = size > max ? size - max : size
293
- max -= size
294
- const [ { data } ] = await lo . read ( l )
295
- this . push ( data )
296
- if ( data . length < size )
297
- this . push ( null )
298
- }
299
- } )
300
- }
301
-
302
- async function writable ( {
303
- highWaterMark = 2048 * 8 ,
304
- start = 0
305
- } = { } ) {
306
- start && await lo . seek ( start )
307
- return new Stream . Writable ( {
308
- highWaterMark,
309
- write ( chunk , encoding , callback ) {
310
- lo . write ( chunk ) . then ( ( ) => callback ( ) , callback )
311
- }
312
- } )
313
- }
314
- } ) . catch ( reject )
315
- } )
316
- }
317
-
318
249
function json ( x ) {
319
250
return new Parameter ( x , 3802 )
320
251
}
0 commit comments