Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 42 additions & 27 deletions decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"unicode/utf16"
)

// Receiver defines the callback function to receive the emitted tokens,
// return true to continue parsing, false to stop.
type Receiver = func(*MetaValue) bool

// ValueType - defines the type of each JSON value
type ValueType int

Expand Down Expand Up @@ -73,7 +77,6 @@ type Decoder struct {

depth int
scratch *scratch
metaCh chan *MetaValue
err error

// follow line position to add context to errors
Expand All @@ -89,7 +92,6 @@ func NewDecoder(r io.Reader, emitDepth int) *Decoder {
scanner: newScanner(r),
emitDepth: emitDepth,
scratch: &scratch{data: make([]byte, 1024)},
metaCh: make(chan *MetaValue, 128),
}
if emitDepth < 0 {
d.emitDepth = 0
Expand Down Expand Up @@ -127,8 +129,15 @@ func (d *Decoder) Recursive() *Decoder {
// Stream begins decoding from the underlying reader and returns a
// streaming MetaValue channel for JSON values at the configured emitDepth.
func (d *Decoder) Stream() chan *MetaValue {
go d.decode()
return d.metaCh
metaCh := make(chan *MetaValue, 128)
go func() {
defer close(metaCh)
d.err = d.Decode(func(meta *MetaValue) bool {
metaCh <- meta
return true
})
}()
return metaCh
}

// Pos returns the number of bytes consumed from the underlying reader
Expand All @@ -138,32 +147,32 @@ func (d *Decoder) Pos() int { return int(d.pos) }
func (d *Decoder) Err() error { return d.err }

// Decode parses the JSON-encoded data and returns an interface value
func (d *Decoder) decode() {
defer close(d.metaCh)
func (d *Decoder) Decode(receiver Receiver) error {
d.skipSpaces()
for d.remaining() > 0 {
_, err := d.emitAny()
if err != nil {
d.err = err
break
if _, err := d.emitAny(receiver); err != nil {
return err
}
d.skipSpaces()
}
return nil
}

func (d *Decoder) emitAny() (interface{}, error) {
func (d *Decoder) emitAny(receiver Receiver) (interface{}, error) {
if d.pos >= atomic.LoadInt64(&d.end) {
return nil, d.mkError(ErrUnexpectedEOF)
}
offset := d.pos - 1
i, t, err := d.any()
i, t, err := d.any(receiver)
if d.willEmit() {
d.metaCh <- &MetaValue{
if !receiver(&MetaValue{
Offset: int(offset),
Length: int(d.pos - offset),
Depth: d.depth,
Value: i,
ValueType: t,
}) {
return nil, d.mkError(ErrCanceled)
}
}
return i, err
Expand All @@ -180,7 +189,7 @@ func (d *Decoder) willEmit() bool {

// any used to decode any valid JSON value, and returns an
// interface{} that holds the actual data
func (d *Decoder) any() (interface{}, ValueType, error) {
func (d *Decoder) any(receiver Receiver) (interface{}, ValueType, error) {
c := d.cur()

switch c {
Expand Down Expand Up @@ -224,15 +233,15 @@ func (d *Decoder) any() (interface{}, ValueType, error) {
}
return nil, Unknown, d.mkError(ErrSyntax, "in literal null")
case '[':
i, err := d.array()
i, err := d.array(receiver)
return i, Array, err
case '{':
var i interface{}
var err error
if d.objectAsKVS {
i, err = d.objectOrdered()
i, err = d.objectOrdered(receiver)
} else {
i, err = d.object()
i, err = d.object(receiver)
}
return i, Object, err
default:
Expand Down Expand Up @@ -415,7 +424,7 @@ func (d *Decoder) number() (float64, error) {
}

// array accept valid JSON array value
func (d *Decoder) array() ([]interface{}, error) {
func (d *Decoder) array(receiver Receiver) ([]interface{}, error) {
d.depth++

var (
Expand All @@ -431,7 +440,7 @@ func (d *Decoder) array() ([]interface{}, error) {
}

scan:
if v, err = d.emitAny(); err != nil {
if v, err = d.emitAny(receiver); err != nil {
goto out
}

Expand All @@ -456,7 +465,7 @@ out:
}

// object accept valid JSON array value
func (d *Decoder) object() (map[string]interface{}, error) {
func (d *Decoder) object(receiver Receiver) (map[string]interface{}, error) {
d.depth++

var (
Expand Down Expand Up @@ -500,20 +509,23 @@ scan:
// read value
d.skipSpaces()
if d.emitKV {
if v, t, err = d.any(); err != nil {
if v, t, err = d.any(receiver); err != nil {
break
}
if d.willEmit() {
d.metaCh <- &MetaValue{
if !receiver(&MetaValue{
Offset: int(offset),
Length: int(d.pos - offset),
Depth: d.depth,
Value: KV{k, v},
ValueType: t,
}) {
err = d.mkError(ErrCanceled)
break
}
}
} else {
if v, err = d.emitAny(); err != nil {
if v, err = d.emitAny(receiver); err != nil {
break
}
}
Expand Down Expand Up @@ -541,7 +553,7 @@ out:
}

// object (ordered) accept valid JSON array value
func (d *Decoder) objectOrdered() (KVS, error) {
func (d *Decoder) objectOrdered(receiver Receiver) (KVS, error) {
d.depth++

var (
Expand Down Expand Up @@ -585,20 +597,23 @@ scan:
// read value
d.skipSpaces()
if d.emitKV {
if v, t, err = d.any(); err != nil {
if v, t, err = d.any(receiver); err != nil {
break
}
if d.willEmit() {
d.metaCh <- &MetaValue{
if !receiver(&MetaValue{
Offset: int(offset),
Length: int(d.pos - offset),
Depth: d.depth,
Value: KV{k, v},
ValueType: t,
}) {
err = d.mkError(ErrCanceled)
break
}
}
} else {
if v, err = d.emitAny(); err != nil {
if v, err = d.emitAny(receiver); err != nil {
break
}
}
Expand Down
39 changes: 39 additions & 0 deletions decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jstream

import (
"bytes"
"strings"
"testing"
)

Expand Down Expand Up @@ -235,3 +236,41 @@ func TestDecoderReaderFailure(t *testing.T) {
t.Fatalf("missing expected underlying reader error")
}
}

func TestDecoderCallback(t *testing.T) {
var result []*MetaValue
body := `{ "bio": "bada bing bada boom", "id": 1, "name": "Charles" }
{ "bio": "bada bing bada boom", "id": 2, "name": "Charles" }
{ "bio": "bada bing bada boom", "id": 3, "name": "Charles" }
{ "bio": "bada bing bada boom", "id": 4, "name": "Charles" }
{ "bio": "bada bing bada boom", "id": 5, "name": "Charles" }
`

// receive all
result = result[:0]
decoder := NewDecoder(mkReader(body), 0)
decoder.Decode(func(meta *MetaValue) bool {
result = append(result, meta)
return true
})
// assert result
if len(result) != 5 {
t.Fatalf("expected 5 items, got %d", len(result))
}

// receive 2
result = result[:0]
decoder = NewDecoder(mkReader(body), 0)
err := decoder.Decode(func(meta *MetaValue) bool {
result = append(result, meta)
return len(result) < 2
})
// assert result
if len(result) != 2 {
t.Fatalf("expected 2 items, got %d", len(result))
}
// assert cancel error
if err == nil || !strings.Contains(err.Error(), "operation canceled") {
t.Fatalf("expected cancel error, got %s", err)
}
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
var (
ErrSyntax = DecoderError{msg: "invalid character"}
ErrUnexpectedEOF = DecoderError{msg: "unexpected end of JSON input"}
ErrCanceled = DecoderError{msg: "operation canceled"}
)

type errPos [2]int // line number, byte offset where error occurred
Expand Down