Skip to content

Commit 744ca3a

Browse files
committed
Added start of transcoding
1 parent 55ceecb commit 744ca3a

File tree

3 files changed

+103
-31
lines changed

3 files changed

+103
-31
lines changed

pkg/ffmpeg/decoder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
type Decoder struct {
1717
stream int
1818
codec *ff.AVCodecContext
19-
dest *Par // Destination parameters
19+
par *Par // Destination parameters
2020
re *Re // Resample/resize
2121
timeBase ff.AVRational // Timebase for the stream
2222
frame *ff.AVFrame // Destination frame
@@ -29,7 +29,7 @@ type Decoder struct {
2929
func NewDecoder(stream *ff.AVStream, dest *Par, force bool) (*Decoder, error) {
3030
decoder := new(Decoder)
3131
decoder.stream = stream.Id()
32-
decoder.dest = dest
32+
decoder.par = dest
3333
decoder.timeBase = stream.TimeBase()
3434

3535
// Create a frame for decoder output - before resize/resample

pkg/ffmpeg/manager.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,6 @@ func (manager *Manager) NewReader(r io.Reader, format media.Format, opts ...stri
9696
}
9797
*/
9898

99-
/*
100-
func (manager *Manager) Transcode(context,output_writer,input_reader or file,input_mapping_function) {
101-
// 1. Read the input and detect the streams
102-
// 2. Make a mapping to output streams
103-
// 3. Create an output writer or file, with the mapped streams
104-
// 4. Create one goroutine which reads the input and passes frames to a channel
105-
// 5. Create a second goroutine which reads the channel and writes to the output
106-
// 6. When EOF on the input or context is cancelled, then stop
107-
}
108-
*/
109-
11099
///////////////////////////////////////////////////////////////////////////////
111100
// PUBLIC METHODS - VERSION
112101

pkg/ffmpeg/reader.go

Lines changed: 101 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"fmt"
78
"io"
89
"slices"
910
"strings"
11+
"sync"
1012
"time"
1113

1214
// Packages
@@ -230,11 +232,102 @@ func (r *Reader) Metadata(keys ...string) []*Metadata {
230232
// returning an error or io.EOF. The latter will end the decoding process early but
231233
// will not return an error.
232234
func (r *Reader) Decode(ctx context.Context, mapfn DecoderMapFunc, decodefn DecoderFrameFn) error {
233-
decoders := make(map[int]*Decoder, r.input.NumStreams())
235+
// Map streams to decoders
236+
decoders, err := r.mapStreams(mapfn)
237+
if err != nil {
238+
return err
239+
}
240+
defer decoders.Close()
241+
242+
// Do the decoding
243+
return r.decode(ctx, decoders, decodefn)
244+
}
245+
246+
// Transcode the media stream to a writer
247+
// As per the decode method, the map function is called for each stream and should return the
248+
// parameters for the destination. If the map function returns nil for a stream, then
249+
// the stream is ignored.
250+
func (r *Reader) Transcode(ctx context.Context, w io.Writer, mapfn DecoderMapFunc, opt ...Opt) error {
251+
// Map streams to decoders
252+
decoders, err := r.mapStreams(mapfn)
253+
if err != nil {
254+
return err
255+
}
256+
defer decoders.Close()
257+
258+
// Add streams to the output
259+
for _, decoder := range decoders {
260+
opt = append(opt, OptStream(decoder.stream, decoder.par))
261+
}
262+
263+
// Create an output
264+
output, err := NewWriter(w, opt...)
265+
if err != nil {
266+
return err
267+
}
268+
defer output.Close()
269+
270+
// One go-routine for decoding, one for encoding
271+
var wg sync.WaitGroup
272+
var result error
273+
274+
// Make a channel for transcoding frames. The decoder should
275+
// be ahead of the encoder, so there is probably no need to
276+
// create a buffered channel.
277+
ch := make(chan *Frame)
278+
279+
// Decoding
280+
wg.Add(1)
281+
go func() {
282+
defer wg.Done()
283+
if err := r.decode(ctx, decoders, func(stream int, frame *Frame) error {
284+
ch <- frame
285+
return nil
286+
}); err != nil {
287+
result = err
288+
}
289+
// Close channel at the end of decoding
290+
close(ch)
291+
}()
292+
293+
// Encoding
294+
wg.Add(1)
295+
go func() {
296+
defer wg.Done()
297+
for frame := range ch {
298+
fmt.Println("TODO: Write frame to output", frame)
299+
}
300+
}()
301+
302+
// Wait for the process to finish
303+
wg.Wait()
304+
305+
// Return any errors
306+
return result
307+
}
308+
309+
////////////////////////////////////////////////////////////////////////////////
310+
// PRIVATE METHODS - DECODE
311+
312+
type decoderMap map[int]*Decoder
313+
314+
func (d decoderMap) Close() error {
315+
var result error
316+
for _, decoder := range d {
317+
if err := decoder.Close(); err != nil {
318+
result = errors.Join(result, err)
319+
}
320+
}
321+
return result
322+
}
323+
324+
// Map streams to decoders, and return the decoders
325+
func (r *Reader) mapStreams(fn DecoderMapFunc) (decoderMap, error) {
326+
decoders := make(decoderMap, r.input.NumStreams())
234327

235328
// Standard decoder map function copies all streams
236-
if mapfn == nil {
237-
mapfn = func(_ int, par *Par) (*Par, error) {
329+
if fn == nil {
330+
fn = func(_ int, par *Par) (*Par, error) {
238331
return par, nil
239332
}
240333
}
@@ -247,7 +340,7 @@ func (r *Reader) Decode(ctx context.Context, mapfn DecoderMapFunc, decodefn Deco
247340
stream_index := stream.Index()
248341

249342
// Get decoder parameters and map to a decoder
250-
par, err := mapfn(stream.Id(), &Par{
343+
par, err := fn(stream.Id(), &Par{
251344
AVCodecParameters: *stream.CodecPar(),
252345
})
253346
if err != nil {
@@ -268,25 +361,15 @@ func (r *Reader) Decode(ctx context.Context, mapfn DecoderMapFunc, decodefn Deco
268361
result = errors.Join(result, ErrBadParameter.With("no streams to decode"))
269362
}
270363

271-
// Now we have a map of decoders, we can start decoding
272-
if result == nil {
273-
result = r.decode(ctx, decoders, decodefn)
274-
}
275-
276-
// Release resources
277-
for _, decoder := range decoders {
278-
if err := decoder.Close(); err != nil {
279-
result = errors.Join(result, err)
280-
}
364+
// If there are errors, then free the decoders
365+
if result != nil {
366+
result = errors.Join(result, decoders.Close())
281367
}
282368

283369
// Return any errors
284-
return result
370+
return decoders, result
285371
}
286372

287-
////////////////////////////////////////////////////////////////////////////////
288-
// PRIVATE METHODS - DECODE
289-
290373
func (r *Reader) decode(ctx context.Context, decoders map[int]*Decoder, fn DecoderFrameFn) error {
291374
// Allocate a packet
292375
packet := ff.AVCodec_packet_alloc()

0 commit comments

Comments
 (0)