@@ -11,6 +11,9 @@ import (
11
11
12
12
// Packages
13
13
ff "github.com/mutablelogic/go-media/sys/ffmpeg61"
14
+
15
+ // Namespace imports
16
+ . "github.com/djthorpe/go-errors"
14
17
)
15
18
16
19
////////////////////////////////////////////////////////////////////////////////
@@ -20,6 +23,7 @@ import (
20
23
type Reader struct {
21
24
input * ff.AVFormatContext
22
25
avio * ff.AVIOContextEx
26
+ force bool
23
27
}
24
28
25
29
type reader_callback struct {
@@ -105,14 +109,17 @@ func NewReader(r io.Reader, opt ...Opt) (*Reader, error) {
105
109
return reader .open (options )
106
110
}
107
111
108
- func (r * Reader ) open (_ * opts ) (* Reader , error ) {
112
+ func (r * Reader ) open (options * opts ) (* Reader , error ) {
109
113
// Find stream information
110
114
if err := ff .AVFormat_find_stream_info (r .input , nil ); err != nil {
111
115
ff .AVFormat_free_context (r .input )
112
116
ff .AVFormat_avio_context_free (r .avio )
113
117
return nil , err
114
118
}
115
119
120
+ // Set force flag
121
+ r .force = options .force
122
+
116
123
// Return success
117
124
return r , nil
118
125
}
@@ -185,13 +192,122 @@ func (r *Reader) Metadata(keys ...string) []*Metadata {
185
192
return result
186
193
}
187
194
188
- // TODO Decode the media stream into packets and frames
189
- func (r * Reader ) Decode (ctx context.Context , fn DecoderMapFunc ) error {
190
- return errors .New ("not implemented yet" )
195
+ // Decode the media stream into frames. The decodefn is called for each
196
+ // frame decoded from the stream. The map function is called for each stream
197
+ // and should return the parameters for the destination frame. If the map
198
+ // function returns nil, then the stream is ignored.
199
+ //
200
+ // The decoding can be interrupted by cancelling the context, or by the decodefn
201
+ // returning an error or io.EOF. The latter will end the decoding process early but
202
+ // will not return an error.
203
+ func (r * Reader ) Decode (ctx context.Context , decodefn DecoderFrameFn , mapfn DecoderMapFunc ) error {
204
+ decoders := make (map [int ]* Decoder , r .input .NumStreams ())
205
+
206
+ // Standard decoder map function copies all streams
207
+ if mapfn == nil {
208
+ mapfn = func (_ int , par * Par ) (* Par , error ) {
209
+ return par , nil
210
+ }
211
+ }
212
+
213
+ // Create a decoder for each stream
214
+ // The decoder map function should be returning the parameters for the
215
+ // destination frame.
216
+ var result error
217
+ for _ , stream := range r .input .Streams () {
218
+ stream_index := stream .Index ()
219
+
220
+ // Get decoder parameters and map to a decoder
221
+ par , err := mapfn (stream .Id (), & Par {
222
+ AVCodecParameters : * stream .CodecPar (),
223
+ })
224
+ if err != nil {
225
+ result = errors .Join (result , err )
226
+ } else if par == nil {
227
+ continue
228
+ } else if decoder , err := NewDecoder (stream , par , r .force ); err != nil {
229
+ result = errors .Join (result , err )
230
+ } else if _ , exists := decoders [stream_index ]; exists {
231
+ result = errors .Join (result , ErrDuplicateEntry .Withf ("stream index %d" , stream_index ))
232
+ } else {
233
+ decoders [stream_index ] = decoder
234
+ }
235
+ }
236
+
237
+ // Check to see if we have to do something
238
+ if len (decoders ) == 0 {
239
+ result = errors .Join (result , ErrBadParameter .With ("no streams to decode" ))
240
+ }
241
+
242
+ // Now we have a map of decoders, we can start decoding
243
+ if result == nil {
244
+ result = r .decode (ctx , decoders , decodefn )
245
+ }
246
+
247
+ // Release resources
248
+ for _ , decoder := range decoders {
249
+ if err := decoder .Close (); err != nil {
250
+ result = errors .Join (result , err )
251
+ }
252
+ }
253
+
254
+ // Return any errors
255
+ return result
256
+ }
257
+
258
+ ////////////////////////////////////////////////////////////////////////////////
259
+ // PRIVATE METHODS - DECODE
260
+
261
+ func (r * Reader ) decode (ctx context.Context , decoders map [int ]* Decoder , fn DecoderFrameFn ) error {
262
+ // Allocate a packet
263
+ packet := ff .AVCodec_packet_alloc ()
264
+ if packet == nil {
265
+ return errors .New ("failed to allocate packet" )
266
+ }
267
+ defer ff .AVCodec_packet_free (packet )
268
+
269
+ // Read packets
270
+ FOR_LOOP:
271
+ for {
272
+ select {
273
+ case <- ctx .Done ():
274
+ break FOR_LOOP
275
+ default :
276
+ if err := ff .AVFormat_read_frame (r .input , packet ); errors .Is (err , io .EOF ) {
277
+ break FOR_LOOP
278
+ } else if err != nil {
279
+ return err
280
+ }
281
+ stream_index := packet .StreamIndex ()
282
+ if decoder := decoders [stream_index ]; decoder != nil {
283
+ if err := decoder .decode (packet , fn ); errors .Is (err , io .EOF ) {
284
+ break FOR_LOOP
285
+ } else if err != nil {
286
+ return err
287
+ }
288
+ }
289
+ }
290
+
291
+ // Unreference the packet
292
+ ff .AVCodec_packet_unref (packet )
293
+ }
294
+
295
+ // Flush the decoders
296
+ for _ , decoder := range decoders {
297
+ if err := decoder .decode (nil , fn ); errors .Is (err , io .EOF ) {
298
+ // no-op
299
+ } else if err != nil {
300
+ return err
301
+ }
302
+ }
303
+
304
+ // Return the context error - will be cancelled, perhaps, or nil if the
305
+ // demuxer finished successfully without cancellation
306
+ return ctx .Err ()
191
307
}
192
308
193
309
////////////////////////////////////////////////////////////////////////////////
194
- // PRIVATE METHODS
310
+ // PRIVATE METHODS - CALLBACK
195
311
196
312
func (r * reader_callback ) Reader (buf []byte ) int {
197
313
n , err := r .r .Read (buf )
0 commit comments