Skip to content

Commit 0a230fb

Browse files
committed
feat(sftp): added mod_time metadata to sftp input
1 parent aa5b8c0 commit 0a230fb

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

internal/impl/sftp/input.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func sftpInputSpec() *service.ConfigSpec {
5353
This input adds the following metadata fields to each message:
5454
5555
- sftp_path
56+
- sftp_mod_time
5657
5758
You can access these metadata fields using xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].`).
5859
Fields(
@@ -122,14 +123,19 @@ type sftpReader struct {
122123
watcherMinAge time.Duration
123124

124125
// State
125-
stateLock sync.Mutex
126-
scanner codec.DeprecatedFallbackStream
127-
currentPath string
126+
stateLock sync.Mutex
127+
scanner codec.DeprecatedFallbackStream
128+
currentFileInfo currentFileInfo
128129

129130
client *clientPool
130131
pathProvider pathProvider
131132
}
132133

134+
type currentFileInfo struct {
135+
path string
136+
modTime time.Time
137+
}
138+
133139
func newSFTPReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (s *sftpReader, err error) {
134140
s = &sftpReader{
135141
log: mgr.Logger(),
@@ -224,15 +230,16 @@ func (s *sftpReader) tryReadBatch(ctx context.Context) (service.MessageBatch, se
224230

225231
_ = s.scanner.Close(ctx)
226232
s.scanner = nil
227-
s.currentPath = ""
233+
s.currentFileInfo.path = ""
228234
if errors.Is(err, io.EOF) {
229235
err = service.ErrNotConnected
230236
}
231237
return nil, nil, err
232238
}
233239

234240
for _, part := range parts {
235-
part.MetaSetMut("sftp_path", s.currentPath)
241+
part.MetaSetMut("sftp_path", s.currentFileInfo.path)
242+
part.MetaSetMut("sftp_mod_time", s.currentFileInfo.modTime)
236243
}
237244

238245
return parts, codecAckFn, nil
@@ -259,6 +266,11 @@ func (s *sftpReader) initScanner(ctx context.Context) (codec.DeprecatedFallbackS
259266
return nil, service.ErrEndOfInput
260267
}
261268

269+
fileInfo, err := s.client.Stat(path)
270+
if err != nil {
271+
return nil, fmt.Errorf("stat path: %w", err)
272+
}
273+
262274
file, err = s.client.Open(path)
263275
if err != nil {
264276
s.log.With("path", path, "err", err.Error()).Warn("Unable to open previously identified file")
@@ -284,7 +296,10 @@ func (s *sftpReader) initScanner(ctx context.Context) (codec.DeprecatedFallbackS
284296

285297
s.stateLock.Lock()
286298
s.scanner = scanner
287-
s.currentPath = path
299+
s.currentFileInfo = currentFileInfo{
300+
path: path,
301+
modTime: fileInfo.ModTime(),
302+
}
288303
s.stateLock.Unlock()
289304
return scanner, nil
290305
}
@@ -335,7 +350,7 @@ func (s *sftpReader) closeScanner(ctx context.Context) {
335350
s.log.With("error", err).Error("Failed to close scanner")
336351
}
337352
s.scanner = nil
338-
s.currentPath = ""
353+
s.currentFileInfo.path = ""
339354
}
340355
}
341356

internal/impl/sftp/integration_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ cache_resources:
161161
if !ok {
162162
return errors.New("sftp_path metadata not found")
163163
}
164+
_, ok = msg.MetaGet("sftp_mod_time")
165+
if !ok {
166+
return errors.New("sftp_mod_time metadata not found")
167+
}
164168
receivedPaths = append(receivedPaths, path)
165169
return nil
166170
}))

0 commit comments

Comments
 (0)