1
1
const { messages } = require ( 'elasticio-node' ) ;
2
- const { Readable } = require ( 'stream ' ) ;
3
- const util = require ( '../util' ) ;
2
+ const { AttachmentProcessor } = require ( '@elastic.io/component-commons-library ' ) ;
3
+ const { getUserAgent } = require ( '../util' ) ;
4
4
const { callJSForceMethod } = require ( '../helpers/wrapper' ) ;
5
5
6
6
const DEFAULT_TIMEOUT = 600 ; // 10 min
7
7
8
- exports . objectTypes = async function objectTypes ( configuration ) {
8
+ module . exports . objectTypes = async function objectTypes ( configuration ) {
9
9
switch ( configuration . operation ) {
10
10
case 'insert' : {
11
11
return callJSForceMethod . call ( this , configuration , 'getCreateableObjectTypes' ) ;
@@ -20,27 +20,26 @@ exports.objectTypes = async function objectTypes(configuration) {
20
20
}
21
21
} ;
22
22
23
- exports . process = async function bulkCUD ( message , configuration ) {
23
+ module . exports . process = async function bulkCUD ( message , configuration ) {
24
24
this . logger . info ( 'Starting Bulk %s action' , configuration . operation ) ;
25
25
// Get CSV from attachment
26
- if ( ! message . attachments || Object . keys ( message . attachments ) . length === 0 ) {
27
- this . logger . error ( 'Attachment not found' ) ;
28
- return messages . newMessageWithBody ( { } ) ;
29
- }
30
-
31
- const key = Object . keys ( message . attachments ) [ 0 ] ;
32
26
33
27
let timeout = parseInt ( configuration . timeout , 10 ) ;
34
28
if ( Number . isNaN ( timeout ) || ( timeout < 0 ) ) {
35
29
this . logger . warn ( `Timeout is incorrect. Set default value: ${ DEFAULT_TIMEOUT } sec.` ) ;
36
30
timeout = DEFAULT_TIMEOUT ;
37
31
}
32
+ let { attachmentUrl } = message . body ;
33
+ if ( ! attachmentUrl ) {
34
+ if ( ! message . attachments || Object . keys ( message . attachments ) . length === 0 || ! message . attachments [ Object . keys ( message . attachments ) [ 0 ] ] . url ) {
35
+ this . logger . error ( 'Attachment not found' ) ;
36
+ return messages . newMessageWithBody ( { } ) ;
37
+ }
38
+ attachmentUrl = message . attachments [ Object . keys ( message . attachments ) [ 0 ] ] . url ;
39
+ }
38
40
39
- const result = await util . downloadAttachment ( message . attachments [ key ] . url ) ;
40
-
41
- const csvStream = new Readable ( ) ;
42
- csvStream . push ( result ) ;
43
- csvStream . push ( null ) ;
41
+ const attachmentProcessor = new AttachmentProcessor ( getUserAgent ( ) , message . id ) ;
42
+ const { data : csvStream } = await attachmentProcessor . getAttachment ( attachmentUrl , 'stream' ) ;
44
43
45
44
let extra ;
46
45
if ( configuration . operation === 'upsert' ) {
@@ -54,33 +53,36 @@ exports.process = async function bulkCUD(message, configuration) {
54
53
} ;
55
54
const job = await callJSForceMethod . call ( this , configuration , 'bulkCreateJob' , batchOptions ) ;
56
55
const batch = job . createBatch ( ) ;
57
- return new Promise ( ( resolve , reject ) => {
58
- // Upload CSV to SF
59
- batch . execute ( csvStream )
60
- // eslint-disable-next-line no-unused-vars
61
- . on ( 'queue' , ( ) => {
62
- // Check while job status become JobComplete or Failed, Aborted
63
- batch . poll ( 1000 , timeout * 1000 ) ;
64
- } ) . on ( 'response' , ( rets ) => {
65
- // Retrieve the results of the completed job
66
- // emit results
67
- this . logger . debug ( 'Result receiver. Records count:' , rets . length ) ;
68
- const out = messages . newEmptyMessage ( ) ;
69
- out . body = { result : rets } ;
70
- resolve ( out ) ;
71
- } ) . on ( 'error' , ( err ) => {
72
- this . logger . error ( 'Job error' ) ;
73
- reject ( new Error ( err . message ) ) ;
74
- throw Error ( `Job error: ${ err . message } ` ) ;
75
- } ) ;
76
- } ) ;
56
+ try {
57
+ const result = await new Promise ( ( resolve , reject ) => {
58
+ batch . execute ( csvStream )
59
+ . on ( 'queue' , ( ) => {
60
+ batch . poll ( 1000 , timeout * 1000 ) ;
61
+ } ) . on ( 'response' , ( rets ) => {
62
+ resolve ( rets ) ;
63
+ } ) . on ( 'error' , ( err ) => {
64
+ reject ( err ) ;
65
+ } ) ;
66
+ } ) ;
67
+ this . logger . debug ( `Result receiver. Records count: ${ result . length } ` ) ;
68
+ return messages . newMessageWithBody ( { result } ) ;
69
+ } catch ( err ) {
70
+ this . logger . error ( `Job error: ${ err . message } ` ) ;
71
+ throw err ;
72
+ }
77
73
} ;
78
74
79
- exports . getMetaModel = async function getMetaModel ( configuration ) {
75
+ module . exports . getMetaModel = async function getMetaModel ( configuration ) {
80
76
const meta = {
81
77
in : {
82
78
type : 'object' ,
83
- properties : { } ,
79
+ properties : {
80
+ attachmentUrl : {
81
+ type : 'string' ,
82
+ required : false ,
83
+ title : 'Attachment Url' ,
84
+ } ,
85
+ } ,
84
86
} ,
85
87
out : {
86
88
type : 'object' ,
0 commit comments