@@ -74,6 +74,7 @@ public class FormModel: ObservableObject, CustomDebugStringConvertible {
74
74
public struct FileUpload : Identifiable {
75
75
public let id : String
76
76
public let data : Data
77
+ public let ref : Int
77
78
let upload : ( ) async throws -> ( )
78
79
}
79
80
@@ -267,13 +268,13 @@ public class FormModel: ObservableObject, CustomDebugStringConvertible {
267
268
)
268
269
}
269
270
270
- public func queueFileUpload(
271
+ public func queueFileUpload< R : RootRegistry > (
271
272
name: String ,
272
273
id: String ,
273
274
contents: Data ,
274
275
fileType: UTType ,
275
276
fileName: String ,
276
- coordinator: LiveViewCoordinator < some RootRegistry >
277
+ coordinator: LiveViewCoordinator < R >
277
278
) async throws {
278
279
guard let liveChannel = coordinator. liveChannel
279
280
else { return }
@@ -285,6 +286,19 @@ public class FormModel: ObservableObject, CustomDebugStringConvertible {
285
286
" " ,
286
287
id
287
288
)
289
+
290
+ let ref = coordinator. nextUploadRef ( )
291
+
292
+ let fileMetadata = Json . object ( object: [
293
+ " path " : . str( string: name) ,
294
+ " ref " : . str( string: " \( ref) " ) ,
295
+ " last_modified " : . numb( number: . posInt( pos: UInt64 ( Date ( ) . timeIntervalSince1970 * 1000 ) ) ) , // in milliseconds
296
+ " name " : . str( string: fileName) ,
297
+ " relative_path " : . str( string: " " ) ,
298
+ " type " : . str( string: fileType. preferredMIMEType!) ,
299
+ " size " : . numb( number: . posInt( pos: UInt64 ( contents. count) ) )
300
+ ] )
301
+
288
302
if let changeEventName {
289
303
let replyPayload = try await coordinator. liveChannel!. channel ( ) . call (
290
304
event: . user( user: " event " ) ,
@@ -294,28 +308,103 @@ public class FormModel: ObservableObject, CustomDebugStringConvertible {
294
308
" value " : . str( string: " _target= \( name) " ) ,
295
309
" uploads " : . object( object: [
296
310
id: . array( array: [
297
- . object( object: [
298
- " path " : . str( string: fileName) ,
299
- " ref " : . str( string: String ( coordinator. nextUploadRef ( ) ) ) ,
300
- " last_modified " : . numb( number: . posInt( pos: UInt64 ( Date ( ) . timeIntervalSince1970 * 1000 ) ) ) , // in milliseconds
301
- " name " : . str( string: fileName) ,
302
- " relative_path " : . str( string: " " ) ,
303
- " type " : . str( string: fileType. preferredMIMEType!) ,
304
- " size " : . numb( number: . posInt( pos: UInt64 ( contents. count) ) )
305
- ] )
311
+ fileMetadata
306
312
] )
307
313
] )
308
314
] ) ) ,
309
315
timeout: 10_000
310
316
)
311
317
try await coordinator. handleEventReplyPayload ( replyPayload)
312
318
}
313
- self . fileUploads. append ( . init (
319
+ self . fileUploads. append ( FileUpload (
314
320
id: id,
315
321
data: contents,
316
- upload: { try await liveChannel. uploadFile ( file) }
322
+ ref: ref,
323
+ upload: {
324
+ do {
325
+ let entries = Json . array ( array: [
326
+ fileMetadata
327
+ ] )
328
+
329
+ let payload = LiveViewNativeCore . Payload. jsonPayload ( json: . object( object: [
330
+ " ref " : . str( string: id) ,
331
+ " entries " : entries,
332
+ ] ) )
333
+
334
+ print ( " sending preflight request \( ref) " )
335
+
336
+ let response = try await coordinator. liveChannel!. channel ( ) . call (
337
+ event: . user( user: " allow_upload " ) ,
338
+ payload: payload,
339
+ timeout: 10_000
340
+ )
341
+
342
+ try await coordinator. handleEventReplyPayload ( response)
343
+
344
+ print ( " got preflight response \( response) " )
345
+
346
+ // LiveUploader.initAdapterUpload
347
+ // UploadEntry.uploader
348
+ // utils.channelUploader
349
+ // EntryUploader
350
+ let reply = switch response {
351
+ case let . jsonPayload( json: json) :
352
+ json
353
+ default :
354
+ fatalError ( )
355
+ }
356
+ print ( reply)
357
+
358
+ let allowUploadReply = try JsonDecoder ( ) . decode ( AllowUploadReply . self, from: reply)
359
+
360
+ let entry : Json = switch reply {
361
+ case let . object( object: object) :
362
+ switch object [ " entries " ] {
363
+ case let . object( object: object) :
364
+ object [ " \( ref) " ] !
365
+ default :
366
+ fatalError ( )
367
+ }
368
+ default :
369
+ fatalError ( )
370
+ }
371
+
372
+
373
+ let uploadEntry = UploadEntry < R > ( data: contents, ref: allowUploadReply. ref, entryRef: ref, meta: entry, config: allowUploadReply. config, coordinator: coordinator)
374
+ switch entry {
375
+ case let . object( object: meta) :
376
+ switch meta [ " uploader " ] ! {
377
+ case let . str( string: uploader) :
378
+ try await coordinator. session. configuration. uploaders [ uploader] !. upload ( uploadEntry, for: coordinator)
379
+ default :
380
+ fatalError ( )
381
+ }
382
+ case let . str( string: uploadToken) :
383
+ try await UploadEntry < R > . ChannelUploader ( ) . upload ( uploadEntry, for: coordinator)
384
+ default :
385
+ fatalError ( )
386
+ }
387
+
388
+ print ( " done " )
389
+ } catch {
390
+ fatalError ( error. localizedDescription)
391
+ }
392
+ }
317
393
) )
318
394
}
395
+
396
+ public struct UploadConfig : Codable {
397
+ public let chunk_size : Int
398
+ public let max_entries : Int
399
+ public let chunk_timeout : Int
400
+ public let max_file_size : Int
401
+ }
402
+
403
+ fileprivate struct AllowUploadReply : Codable {
404
+ let ref : String
405
+ let config : UploadConfig
406
+ // let entries: [String:String]
407
+ }
319
408
}
320
409
321
410
private extension URLComponents {
@@ -330,3 +419,82 @@ private extension URLComponents {
330
419
return components. query!
331
420
}
332
421
}
422
+
423
+ public final class UploadEntry < R: RootRegistry > {
424
+ public let data : Data
425
+ public let ref : String
426
+ public let entryRef : Int
427
+ public let meta : Json
428
+ public let config : FormModel . UploadConfig
429
+ private weak var coordinator : LiveViewCoordinator < R > ?
430
+
431
+ init ( data: Data , ref: String , entryRef: Int , meta: Json , config: FormModel . UploadConfig , coordinator: LiveViewCoordinator < R > ) {
432
+ self . data = data
433
+ self . ref = ref
434
+ self . entryRef = entryRef
435
+ self . meta = meta
436
+ self . config = config
437
+ self . coordinator = coordinator
438
+ }
439
+
440
+ @MainActor
441
+ public func progress( _ progress: Int ) async throws {
442
+ let progressReply = try await coordinator!. liveChannel!. channel ( ) . call (
443
+ event: . user( user: " progress " ) ,
444
+ payload: . jsonPayload( json: . object( object: [
445
+ " event " : . null,
446
+ " ref " : . str( string: ref) ,
447
+ " entry_ref " : . str( string: " \( entryRef) " ) ,
448
+ " progress " : . numb( number: . posInt( pos: UInt64 ( progress) ) ) ,
449
+ ] ) ) ,
450
+ timeout: 10_000
451
+ )
452
+ print ( progressReply)
453
+ _ = try await coordinator!. handleEventReplyPayload ( progressReply)
454
+ }
455
+
456
+ @MainActor
457
+ public func error( _ error: some Error ) async throws {
458
+
459
+ }
460
+
461
+ @MainActor
462
+ public func pause( ) async throws {
463
+
464
+ }
465
+
466
+ public struct ChannelUploader : Uploader {
467
+ public init ( ) { }
468
+
469
+ public func upload< Root: RootRegistry > (
470
+ _ entry: UploadEntry < Root > ,
471
+ for coordinator: LiveViewCoordinator < Root >
472
+ ) async throws {
473
+ let uploadChannel = try await coordinator. session. liveSocket!. socket ( ) . channel ( topic: . fromString( topic: " lvu: \( entry. entryRef) " ) , payload: . jsonPayload( json: . object( object: [
474
+ " token " : entry. meta
475
+ ] ) ) )
476
+ _ = try await uploadChannel. join ( timeout: 10_000 )
477
+
478
+ let stream = InputStream ( data: entry. data)
479
+ var buf = [ UInt8] ( repeating: 0 , count: entry. config. chunk_size)
480
+ stream. open ( )
481
+ var amountRead = 0
482
+ while case let amount = stream. read ( & buf, maxLength: entry. config. chunk_size) , amount > 0 {
483
+ let resp = try await uploadChannel. call ( event: . user( user: " chunk " ) , payload: . binary( bytes: Data ( buf [ ..< amount] ) ) , timeout: 10_000 )
484
+ print ( " uploaded chunk: \( resp) " )
485
+ amountRead += amount
486
+
487
+ try await entry. progress ( Int ( ( Double ( amountRead) / Double( entry. data. count) ) * 100 ) )
488
+ }
489
+ stream. close ( )
490
+
491
+ print ( " finished uploading chunks " )
492
+ try await entry. progress ( 100 )
493
+ }
494
+ }
495
+ }
496
+
497
+ public protocol Uploader {
498
+ @MainActor
499
+ func upload< R: RootRegistry > ( _ entry: UploadEntry < R > , for coordinator: LiveViewCoordinator < R > ) async throws
500
+ }
0 commit comments