Skip to content

Commit b4948ff

Browse files
committed
Added TextStream
1 parent 9e28d94 commit b4948ff

File tree

3 files changed

+257
-9
lines changed

3 files changed

+257
-9
lines changed

pkg/httpresponse/textstream.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package httpresponse
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"errors"
7+
"io"
8+
"net/http"
9+
"sync"
10+
"time"
11+
12+
// Packages
13+
types "github.com/mutablelogic/go-server/pkg/types"
14+
)
15+
16+
///////////////////////////////////////////////////////////////////////////////
17+
// TYPES
18+
19+
// TextStream implements a stream of text events
20+
type TextStream struct {
21+
wg sync.WaitGroup
22+
w io.Writer
23+
ch chan *textevent
24+
err error
25+
}
26+
27+
type textevent struct {
28+
name string
29+
data []any
30+
}
31+
32+
///////////////////////////////////////////////////////////////////////////////
33+
// GLOBALS
34+
35+
const (
36+
defaultKeepAlive = 10 * time.Second
37+
)
38+
39+
var (
40+
strPing = "ping"
41+
strEvent = []byte("event: ")
42+
strData = []byte("data: ")
43+
strNewline = []byte("\n")
44+
)
45+
46+
///////////////////////////////////////////////////////////////////////////////
47+
// LIFECYCLE
48+
49+
// Create a new text stream with mimetype text/event-stream
50+
// Additional header tuples can be provided as a series of key-value pairs
51+
func NewTextStream(w http.ResponseWriter, tuples ...string) *TextStream {
52+
// Check parameters
53+
if w == nil {
54+
return nil
55+
}
56+
if len(tuples)%2 != 0 {
57+
return nil
58+
}
59+
60+
// Create a text stream
61+
self := new(TextStream)
62+
self.w = w
63+
self.ch = make(chan *textevent)
64+
65+
// Set the default content type
66+
w.Header().Set(types.ContentTypeHeader, types.ContentTypeTextStream)
67+
68+
// Set additional headers
69+
for i := 0; i < len(tuples); i += 2 {
70+
w.Header().Set(tuples[i], tuples[i+1])
71+
}
72+
73+
// Write the response, don't know is this is the right one
74+
w.WriteHeader(http.StatusContinue)
75+
76+
// goroutine will write to the response writer until the channel is closed
77+
self.wg.Add(1)
78+
go func() {
79+
defer self.wg.Done()
80+
81+
// Create a ticker for ping messages
82+
ticker := time.NewTimer(100 * time.Millisecond)
83+
defer ticker.Stop()
84+
85+
// Run until the channel is closed
86+
for {
87+
select {
88+
case evt := <-self.ch:
89+
if evt == nil {
90+
return
91+
}
92+
self.emit(evt)
93+
ticker.Reset(defaultKeepAlive)
94+
case <-ticker.C:
95+
self.err = errors.Join(self.err, self.emit(&textevent{strPing, nil}))
96+
ticker.Reset(defaultKeepAlive)
97+
}
98+
}
99+
}()
100+
101+
// Return the textstream object
102+
return self
103+
}
104+
105+
// Close the text stream to stop sending ping messages
106+
func (s *TextStream) Close() error {
107+
// Close the channel
108+
close(s.ch)
109+
110+
// Wait for the goroutine to finish
111+
s.wg.Wait()
112+
113+
// Return any errors
114+
return s.err
115+
}
116+
117+
///////////////////////////////////////////////////////////////////////////////
118+
// PUBLIC METHODS
119+
120+
// Write a text event to the stream, and one or more optional data objects
121+
// which are encoded as JSON
122+
func (s *TextStream) Write(name string, data ...any) {
123+
s.ch <- &textevent{name, data}
124+
}
125+
126+
///////////////////////////////////////////////////////////////////////////////
127+
// PRIVATE METHODS
128+
129+
// emit an event to the stream
130+
func (s *TextStream) emit(e *textevent) error {
131+
var result error
132+
133+
// Write the event to the stream
134+
if e.name != "" {
135+
if err := s.write(strEvent, []byte(e.name), strNewline); err != nil {
136+
return err
137+
}
138+
}
139+
140+
// Write the data to the stream
141+
for _, v := range e.data {
142+
if v == nil {
143+
continue
144+
} else if data, err := json.Marshal(v); err != nil {
145+
result = errors.Join(result, err)
146+
} else if err := s.write(strData, data, strNewline); err != nil {
147+
result = errors.Join(result, err)
148+
}
149+
}
150+
151+
// Flush the event
152+
if result == nil {
153+
if err := s.write(strNewline); err != nil {
154+
result = errors.Join(result, err)
155+
}
156+
if w, ok := s.w.(http.Flusher); ok {
157+
w.Flush()
158+
}
159+
}
160+
161+
// Return any errors
162+
return result
163+
}
164+
165+
func (s *TextStream) write(v ...[]byte) error {
166+
if _, err := s.w.Write(bytes.Join(v, nil)); err != nil {
167+
return err
168+
}
169+
return nil
170+
}

pkg/httpresponse/textstream_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package httpresponse_test
2+
3+
import (
4+
"net/http/httptest"
5+
"testing"
6+
"time"
7+
8+
// Packages
9+
"github.com/mutablelogic/go-server/pkg/httpresponse"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func Test_textstream_001(t *testing.T) {
14+
assert := assert.New(t)
15+
16+
t.Run("New", func(t *testing.T) {
17+
resp := httptest.NewRecorder()
18+
ts := httpresponse.NewTextStream(resp)
19+
assert.NotNil(ts)
20+
assert.NoError(ts.Close())
21+
})
22+
23+
t.Run("Ping", func(t *testing.T) {
24+
resp := httptest.NewRecorder()
25+
ts := httpresponse.NewTextStream(resp)
26+
assert.NotNil(ts)
27+
28+
time.Sleep(1 * time.Second)
29+
assert.NoError(ts.Close())
30+
assert.Equal(100, resp.Code)
31+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
32+
assert.Equal("event: ping\n\n", resp.Body.String())
33+
})
34+
35+
t.Run("EventDataAfterPing", func(t *testing.T) {
36+
resp := httptest.NewRecorder()
37+
ts := httpresponse.NewTextStream(resp)
38+
assert.NotNil(ts)
39+
40+
time.Sleep(200 * time.Millisecond)
41+
ts.Write("foo")
42+
43+
time.Sleep(1 * time.Second)
44+
assert.NoError(ts.Close())
45+
assert.Equal(100, resp.Code)
46+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
47+
assert.Equal("event: ping\n\n"+"event: foo\n\n", resp.Body.String())
48+
})
49+
50+
t.Run("EventDataNoPing", func(t *testing.T) {
51+
resp := httptest.NewRecorder()
52+
ts := httpresponse.NewTextStream(resp)
53+
assert.NotNil(ts)
54+
55+
ts.Write("foo", "bar")
56+
57+
time.Sleep(1 * time.Second)
58+
assert.NoError(ts.Close())
59+
assert.Equal(100, resp.Code)
60+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
61+
assert.Equal("event: foo\n"+"data: \"bar\"\n\n", resp.Body.String())
62+
})
63+
64+
t.Run("EventDataData", func(t *testing.T) {
65+
resp := httptest.NewRecorder()
66+
ts := httpresponse.NewTextStream(resp)
67+
assert.NotNil(ts)
68+
69+
ts.Write("foo", "bar1", "bar2")
70+
71+
time.Sleep(1 * time.Second)
72+
assert.NoError(ts.Close())
73+
assert.Equal(100, resp.Code)
74+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
75+
assert.Equal("event: foo\n"+"data: \"bar1\"\n"+"data: \"bar2\"\n\n", resp.Body.String())
76+
})
77+
}

pkg/types/mime.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ import (
88
// GLOBALS
99

1010
const (
11-
ContentTypeJSON = "application/json"
12-
ContentTypeXML = "application/xml"
13-
ContentTypeRSS = "application/rss+xml"
14-
ContentTypeBinary = "application/octet-stream"
15-
ContentTypeCSV = "text/csv"
16-
ContentTypeTextXml = "text/xml"
17-
ContentTypeTextPlain = "text/plain"
18-
ContentTypeFormData = "multipart/form-data"
19-
ContentTypeAny = "*/*"
11+
ContentTypeJSON = "application/json"
12+
ContentTypeXML = "application/xml"
13+
ContentTypeRSS = "application/rss+xml"
14+
ContentTypeBinary = "application/octet-stream"
15+
ContentTypeCSV = "text/csv"
16+
ContentTypeTextXml = "text/xml"
17+
ContentTypeTextPlain = "text/plain"
18+
ContentTypeTextStream = "text/event-stream"
19+
ContentTypeFormData = "multipart/form-data"
20+
ContentTypeAny = "*/*"
2021
)
2122

2223
///////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)