Skip to content

Commit ebae68a

Browse files
committed
feat(sftp): added mod_time metadata to sftp input
1 parent aa5b8c0 commit ebae68a

File tree

2 files changed

+24
-3
lines changed

2 files changed

+24
-3
lines changed

internal/impl/sftp/input.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func sftpInputSpec() *service.ConfigSpec {
5353
This input adds the following metadata fields to each message:
5454
5555
- sftp_path
56+
- sftp_mod_time
5657
5758
You can access these metadata fields using xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].`).
5859
Fields(
@@ -122,14 +123,20 @@ type sftpReader struct {
122123
watcherMinAge time.Duration
123124

124125
// State
125-
stateLock sync.Mutex
126-
scanner codec.DeprecatedFallbackStream
127-
currentPath string
126+
stateLock sync.Mutex
127+
scanner codec.DeprecatedFallbackStream
128+
currentPath string
129+
currentFileInfo currentFileInfo
128130

129131
client *clientPool
130132
pathProvider pathProvider
131133
}
132134

135+
type currentFileInfo struct {
136+
path string
137+
modTime time.Time
138+
}
139+
133140
func newSFTPReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (s *sftpReader, err error) {
134141
s = &sftpReader{
135142
log: mgr.Logger(),
@@ -233,6 +240,7 @@ func (s *sftpReader) tryReadBatch(ctx context.Context) (service.MessageBatch, se
233240

234241
for _, part := range parts {
235242
part.MetaSetMut("sftp_path", s.currentPath)
243+
part.MetaSetMut("sftp_mod_time", s.currentFileInfo.modTime)
236244
}
237245

238246
return parts, codecAckFn, nil
@@ -259,6 +267,11 @@ func (s *sftpReader) initScanner(ctx context.Context) (codec.DeprecatedFallbackS
259267
return nil, service.ErrEndOfInput
260268
}
261269

270+
fileInfo, err := s.client.Stat(path)
271+
if err != nil {
272+
return nil, fmt.Errorf("stat path: %w", err)
273+
}
274+
262275
file, err = s.client.Open(path)
263276
if err != nil {
264277
s.log.With("path", path, "err", err.Error()).Warn("Unable to open previously identified file")
@@ -285,6 +298,10 @@ func (s *sftpReader) initScanner(ctx context.Context) (codec.DeprecatedFallbackS
285298
s.stateLock.Lock()
286299
s.scanner = scanner
287300
s.currentPath = path
301+
s.currentFileInfo = currentFileInfo{
302+
path: path,
303+
modTime: fileInfo.ModTime(),
304+
}
288305
s.stateLock.Unlock()
289306
return scanner, nil
290307
}

internal/impl/sftp/integration_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ cache_resources:
161161
if !ok {
162162
return errors.New("sftp_path metadata not found")
163163
}
164+
_, ok = msg.MetaGet("sftp_mod_time")
165+
if !ok {
166+
return errors.New("sftp_mod_time metadata not found")
167+
}
164168
receivedPaths = append(receivedPaths, path)
165169
return nil
166170
}))

0 commit comments

Comments
 (0)