Browse Source

codec: json: use sync.pool for decoding buffers and optimize DecodeBytes

The buffer used for decoding may be small in some cases.

We now leverage the sync.pool for buffers, and grab the buffer
we need, and return to pool when done.

Also, when decoding bytes written as base64, we do not expect a string
quotation mark (") in the string, so we just look for the ending "
and return that as the bytes in the stream.
Ugorji Nwoke 6 years ago
parent
commit
eee1bf808f
4 changed files with 122 additions and 45 deletions
  1. 1 0
      codec/encode.go
  2. 50 14
      codec/helper.go
  3. 70 31
      codec/json.go
  4. 1 0
      codec/writer.go

+ 1 - 0
codec/encode.go

@@ -1053,6 +1053,7 @@ func (e *Encoder) mustEncode(v interface{}) {
 
 
 	if e.wf.buf == nil {
 	if e.wf.buf == nil {
 		e.wf.buf = e.wf.bytesBufPooler.get(e.wf.sz)
 		e.wf.buf = e.wf.bytesBufPooler.get(e.wf.sz)
+		e.wf.buf = e.wf.buf[:cap(e.wf.buf)]
 	}
 	}
 	e.wf.calls++
 	e.wf.calls++
 	e.encode(v)
 	e.encode(v)

+ 50 - 14
codec/helper.go

@@ -2579,7 +2579,7 @@ type pooler struct {
 
 
 	// lifetime-scoped pooled resources
 	// lifetime-scoped pooled resources
 	// dn                                 sync.Pool // for decNaked
 	// dn                                 sync.Pool // for decNaked
-	buf1k, buf2k, buf4k, buf8k, buf16k, buf32k, buf64k sync.Pool // for [N]byte
+	buf256, buf1k, buf2k, buf4k, buf8k, buf16k, buf32k sync.Pool // for [N]byte
 
 
 	mapStrU16, mapU16Str, mapU16Bytes sync.Pool // for Binc
 	mapStrU16, mapU16Str, mapU16Bytes sync.Pool // for Binc
 	// mapU16StrBytes sync.Pool // for Binc
 	// mapU16StrBytes sync.Pool // for Binc
@@ -2596,13 +2596,14 @@ func (p *pooler) init() {
 
 
 	// p.dn.New = func() interface{} { x := new(decNaked); x.init(); return x }
 	// p.dn.New = func() interface{} { x := new(decNaked); x.init(); return x }
 
 
+	p.buf256.New = func() interface{} { return new([256]byte) }
 	p.buf1k.New = func() interface{} { return new([1 * 1024]byte) }
 	p.buf1k.New = func() interface{} { return new([1 * 1024]byte) }
 	p.buf2k.New = func() interface{} { return new([2 * 1024]byte) }
 	p.buf2k.New = func() interface{} { return new([2 * 1024]byte) }
 	p.buf4k.New = func() interface{} { return new([4 * 1024]byte) }
 	p.buf4k.New = func() interface{} { return new([4 * 1024]byte) }
 	p.buf8k.New = func() interface{} { return new([8 * 1024]byte) }
 	p.buf8k.New = func() interface{} { return new([8 * 1024]byte) }
 	p.buf16k.New = func() interface{} { return new([16 * 1024]byte) }
 	p.buf16k.New = func() interface{} { return new([16 * 1024]byte) }
 	p.buf32k.New = func() interface{} { return new([32 * 1024]byte) }
 	p.buf32k.New = func() interface{} { return new([32 * 1024]byte) }
-	p.buf64k.New = func() interface{} { return new([64 * 1024]byte) }
+	// p.buf64k.New = func() interface{} { return new([64 * 1024]byte) }
 
 
 	p.mapStrU16.New = func() interface{} { return make(map[string]uint16, 16) }
 	p.mapStrU16.New = func() interface{} { return make(map[string]uint16, 16) }
 	p.mapU16Str.New = func() interface{} { return make(map[uint16]string, 16) }
 	p.mapU16Str.New = func() interface{} { return make(map[uint16]string, 16) }
@@ -2754,8 +2755,40 @@ func (z *bytesBufPooler) end() {
 }
 }
 
 
 func (z *bytesBufPooler) get(bufsize int) (buf []byte) {
 func (z *bytesBufPooler) get(bufsize int) (buf []byte) {
-	// ensure an end is called first (if necessary)
 	if z.pool != nil {
 	if z.pool != nil {
+		switch z.pool {
+		case &pool.buf256:
+			if bufsize <= 256 {
+				buf = z.poolbuf.(*[256]byte)[:bufsize]
+			}
+		case &pool.buf1k:
+			if bufsize <= 1*1024 {
+				buf = z.poolbuf.(*[1 * 1024]byte)[:bufsize]
+			}
+		case &pool.buf2k:
+			if bufsize <= 2*1024 {
+				buf = z.poolbuf.(*[2 * 1024]byte)[:bufsize]
+			}
+		case &pool.buf4k:
+			if bufsize <= 4*1024 {
+				buf = z.poolbuf.(*[4 * 1024]byte)[:bufsize]
+			}
+		case &pool.buf8k:
+			if bufsize <= 8*1024 {
+				buf = z.poolbuf.(*[8 * 1024]byte)[:bufsize]
+			}
+		case &pool.buf16k:
+			if bufsize <= 16*1024 {
+				buf = z.poolbuf.(*[16 * 1024]byte)[:bufsize]
+			}
+		case &pool.buf32k:
+			if bufsize <= 32*1024 {
+				buf = z.poolbuf.(*[32 * 1024]byte)[:bufsize]
+			}
+		}
+		if buf != nil {
+			return
+		}
 		z.pool.Put(z.poolbuf)
 		z.pool.Put(z.poolbuf)
 		z.pool, z.poolbuf = nil, nil
 		z.pool, z.poolbuf = nil, nil
 	}
 	}
@@ -2794,27 +2827,30 @@ func (z *bytesBufPooler) get(bufsize int) (buf []byte) {
 	// }
 	// }
 	// return
 	// return
 
 
-	if bufsize <= 1*1024 {
+	if bufsize <= 256 {
+		z.pool, z.poolbuf = &pool.buf256, pool.buf256.Get() // pool.bytes1k()
+		buf = z.poolbuf.(*[256]byte)[:bufsize]
+	} else if bufsize <= 1*1024 {
 		z.pool, z.poolbuf = &pool.buf1k, pool.buf1k.Get() // pool.bytes1k()
 		z.pool, z.poolbuf = &pool.buf1k, pool.buf1k.Get() // pool.bytes1k()
-		buf = z.poolbuf.(*[1 * 1024]byte)[:]
+		buf = z.poolbuf.(*[1 * 1024]byte)[:bufsize]
 	} else if bufsize <= 2*1024 {
 	} else if bufsize <= 2*1024 {
 		z.pool, z.poolbuf = &pool.buf2k, pool.buf2k.Get() // pool.bytes2k()
 		z.pool, z.poolbuf = &pool.buf2k, pool.buf2k.Get() // pool.bytes2k()
-		buf = z.poolbuf.(*[2 * 1024]byte)[:]
+		buf = z.poolbuf.(*[2 * 1024]byte)[:bufsize]
 	} else if bufsize <= 4*1024 {
 	} else if bufsize <= 4*1024 {
 		z.pool, z.poolbuf = &pool.buf4k, pool.buf4k.Get() // pool.bytes4k()
 		z.pool, z.poolbuf = &pool.buf4k, pool.buf4k.Get() // pool.bytes4k()
-		buf = z.poolbuf.(*[4 * 1024]byte)[:]
+		buf = z.poolbuf.(*[4 * 1024]byte)[:bufsize]
 	} else if bufsize <= 8*1024 {
 	} else if bufsize <= 8*1024 {
 		z.pool, z.poolbuf = &pool.buf8k, pool.buf8k.Get() // pool.bytes8k()
 		z.pool, z.poolbuf = &pool.buf8k, pool.buf8k.Get() // pool.bytes8k()
-		buf = z.poolbuf.(*[8 * 1024]byte)[:]
+		buf = z.poolbuf.(*[8 * 1024]byte)[:bufsize]
 	} else if bufsize <= 16*1024 {
 	} else if bufsize <= 16*1024 {
 		z.pool, z.poolbuf = &pool.buf16k, pool.buf16k.Get() // pool.bytes16k()
 		z.pool, z.poolbuf = &pool.buf16k, pool.buf16k.Get() // pool.bytes16k()
-		buf = z.poolbuf.(*[16 * 1024]byte)[:]
-	} else if bufsize <= 32*1024 {
+		buf = z.poolbuf.(*[16 * 1024]byte)[:bufsize]
+	} else { // if bufsize <= 32*1024 {
 		z.pool, z.poolbuf = &pool.buf32k, pool.buf32k.Get() // pool.bytes32k()
 		z.pool, z.poolbuf = &pool.buf32k, pool.buf32k.Get() // pool.bytes32k()
-		buf = z.poolbuf.(*[32 * 1024]byte)[:]
-	} else {
-		z.pool, z.poolbuf = &pool.buf64k, pool.buf64k.Get() // pool.bytes64k()
-		buf = z.poolbuf.(*[64 * 1024]byte)[:]
+		buf = z.poolbuf.(*[32 * 1024]byte)[:32*1024]
+		// } else {
+		// 	z.pool, z.poolbuf = &pool.buf64k, pool.buf64k.Get() // pool.bytes64k()
+		// 	buf = z.poolbuf.(*[64 * 1024]byte)[:]
 	}
 	}
 	return
 	return
 }
 }

+ 70 - 31
codec/json.go

@@ -70,7 +70,7 @@ const (
 	jsonU4Chk1 = 'a' - 10
 	jsonU4Chk1 = 'a' - 10
 	jsonU4Chk0 = 'A' - 10
 	jsonU4Chk0 = 'A' - 10
 
 
-	jsonScratchArrayLen = cacheLineSize // 64
+	// jsonScratchArrayLen = cacheLineSize + 32 // 96
 )
 )
 
 
 const (
 const (
@@ -366,7 +366,7 @@ type jsonEncDriver struct {
 
 
 	s *bitset256 // safe set for characters (taking h.HTMLAsIs into consideration)
 	s *bitset256 // safe set for characters (taking h.HTMLAsIs into consideration)
 	// scratch: encode time, numbers, etc. Note: leave 1 byte for containerState
 	// scratch: encode time, numbers, etc. Note: leave 1 byte for containerState
-	b [jsonScratchArrayLen - 16]byte // leave space for bs(len,cap), containerState
+	b [cacheLineSize - 16]byte // leave space for bs(len,cap), containerState
 }
 }
 
 
 // Keep writeIndent, WriteArrayElem, WriteMapElemKey, WriteMapElemValue
 // Keep writeIndent, WriteArrayElem, WriteMapElemKey, WriteMapElemValue
@@ -591,23 +591,28 @@ func (e *jsonEncDriver) atEndOfEncode() {
 
 
 type jsonDecDriver struct {
 type jsonDecDriver struct {
 	noBuiltInTypes
 	noBuiltInTypes
-	d  *Decoder
-	h  *JsonHandle
-	r  *decReaderSwitch
-	se interfaceExtWrapper
+	d *Decoder
+	h *JsonHandle
+	r *decReaderSwitch
 
 
-	bs []byte // scratch, initialized from b. For parsing strings or numbers.
-	// ---- writable fields during execution --- *try* to keep in sep cache line
+	tok  uint8   // used to store the token read right after skipWhiteSpace
+	fnil bool    // found null
+	_    [2]byte // padding
+	bstr [4]byte // scratch used for string \UXXX parsing
+	// c     containerState
 
 
+	// ---- cpu cache line boundary (half - way)
+	// b [jsonScratchArrayLen]byte // scratch 1, used for parsing strings or numbers or time.Time
 	// ---- cpu cache line boundary?
 	// ---- cpu cache line boundary?
-	b [jsonScratchArrayLen]byte // scratch 1, used for parsing strings or numbers or time.Time
+	// ---- writable fields during execution --- *try* to keep in sep cache line
+	bs []byte // scratch - for parsing strings, bytes
+	se interfaceExtWrapper
+
+	bp bytesBufPooler
+
 	// ---- cpu cache line boundary?
 	// ---- cpu cache line boundary?
-	// c     containerState
-	tok  uint8                         // used to store the token read right after skipWhiteSpace
-	fnil bool                          // found null
-	_    [2]byte                       // padding
-	bstr [4]byte                       // scratch used for string \UXXX parsing
-	b2   [jsonScratchArrayLen - 8]byte // scratch 2, used only for readUntil, decNumBytes
+
+	b2 [cacheLineSize + 32]byte // scratch 2, used only for readUntil, decNumBytes
 
 
 	// _ [3]uint64 // padding
 	// _ [3]uint64 // padding
 	// n jsonNum
 	// n jsonNum
@@ -834,7 +839,6 @@ func (d *jsonDecDriver) ContainerType() (vt valueType) {
 }
 }
 
 
 func (d *jsonDecDriver) decNumBytes() (bs []byte) {
 func (d *jsonDecDriver) decNumBytes() (bs []byte) {
-	// stores num bytes in d.bs
 	d.advance()
 	d.advance()
 	if d.tok == '"' {
 	if d.tok == '"' {
 		bs = d.r.readUntil(d.b2[:0], '"')
 		bs = d.r.readUntil(d.b2[:0], '"')
@@ -843,7 +847,7 @@ func (d *jsonDecDriver) decNumBytes() (bs []byte) {
 		d.readLit4Null()
 		d.readLit4Null()
 	} else {
 	} else {
 		d.r.unreadn1()
 		d.r.unreadn1()
-		bs = d.r.readTo(d.bs[:0], &jsonNumSet)
+		bs = d.r.readTo(d.b2[:0], &jsonNumSet)
 	}
 	}
 	d.tok = 0
 	d.tok = 0
 	return
 	return
@@ -1000,29 +1004,56 @@ func (d *jsonDecDriver) DecodeBytes(bs []byte, zerocopy bool) (bsOut []byte) {
 		d.tok = 0
 		d.tok = 0
 		return bs
 		return bs
 	}
 	}
-	d.appendStringAsBytes()
+
 	// base64 encodes []byte{} as "", and we encode nil []byte as null.
 	// base64 encodes []byte{} as "", and we encode nil []byte as null.
 	// Consequently, base64 should decode null as a nil []byte, and "" as an empty []byte{}.
 	// Consequently, base64 should decode null as a nil []byte, and "" as an empty []byte{}.
 	// appendStringAsBytes returns a zero-len slice for both, so as not to reset d.bs.
 	// appendStringAsBytes returns a zero-len slice for both, so as not to reset d.bs.
 	// However, it sets a fnil field to true, so we can check if a null was found.
 	// However, it sets a fnil field to true, so we can check if a null was found.
-	if d.fnil {
+
+	// d.appendStringAsBytes()
+	// if d.fnil {
+	// 	return nil
+	// }
+
+	if d.tok == 'n' {
+		d.readLit4Null()
 		return nil
 		return nil
 	}
 	}
-	if len(d.bs) == 0 {
+
+	if d.tok != '"' {
+		d.d.errorf("json bytes MUST be a base64 encoded string")
+		return
+	}
+
+	bs1 := d.r.readUntil(d.b2[:0], '"')
+	bs1 = bs1[:len(bs1)-1]
+	d.tok = 0
+
+	if len(bs1) == 0 {
 		return []byte{}
 		return []byte{}
 	}
 	}
-	bs0 := d.bs
-	slen := base64.StdEncoding.DecodedLen(len(bs0))
+
+	slen := base64.StdEncoding.DecodedLen(len(bs1))
+	// TODO: what if slen == 0?
 	if slen <= cap(bs) {
 	if slen <= cap(bs) {
 		bsOut = bs[:slen]
 		bsOut = bs[:slen]
-	} else if zerocopy && slen <= cap(d.b2) {
-		bsOut = d.b2[:slen]
+	} else if zerocopy {
+		// if d.bs == nil {
+		// 	d.bs = d.bp.get(slen)
+		// }
+		if slen <= cap(d.bs) {
+			bsOut = d.bs[:slen]
+		} else {
+			d.bs = d.bp.get(slen)
+			bsOut = d.bs
+			// bsOut = make([]byte, slen) // TODO: should i check pool? how to return it back?
+		}
 	} else {
 	} else {
 		bsOut = make([]byte, slen)
 		bsOut = make([]byte, slen)
 	}
 	}
-	slen2, err := base64.StdEncoding.Decode(bsOut, bs0)
+	slen2, err := base64.StdEncoding.Decode(bsOut, bs1)
 	if err != nil {
 	if err != nil {
-		d.d.errorf("error decoding base64 binary '%s': %v", bs0, err)
+		d.d.errorf("error decoding base64 binary '%s': %v", bs1, err)
 		return nil
 		return nil
 	}
 	}
 	if slen != slen2 {
 	if slen != slen2 {
@@ -1047,7 +1078,9 @@ func (d *jsonDecDriver) DecodeStringAsBytes() (s []byte) {
 
 
 func (d *jsonDecDriver) appendStringAsBytes() {
 func (d *jsonDecDriver) appendStringAsBytes() {
 	d.advance()
 	d.advance()
-
+	if d.bs == nil {
+		d.bs = d.bp.get(256)
+	}
 	// xdebug2f("appendStringAsBytes: found: '%c'", d.tok)
 	// xdebug2f("appendStringAsBytes: found: '%c'", d.tok)
 	if d.tok != '"' {
 	if d.tok != '"' {
 		// d.d.errorf("expect char '%c' but got char '%c'", '"', d.tok)
 		// d.d.errorf("expect char '%c' but got char '%c'", '"', d.tok)
@@ -1236,7 +1269,7 @@ F:
 
 
 func (d *jsonDecDriver) bsToString() string {
 func (d *jsonDecDriver) bsToString() string {
 	// if x := d.s.sc; x != nil && x.so && x.st == '}' { // map key
 	// if x := d.s.sc; x != nil && x.so && x.st == '}' { // map key
-	if jsonAlwaysReturnInternString || d.d.c == containerMapKey {
+	if d.d.is != nil && (jsonAlwaysReturnInternString || d.d.c == containerMapKey) {
 		return d.d.string(d.bs)
 		return d.d.string(d.bs)
 	}
 	}
 	return string(d.bs)
 	return string(d.bs)
@@ -1411,7 +1444,8 @@ func (h *JsonHandle) newEncDriver(e *Encoder) (ee encDriver) {
 		ee = &v
 		ee = &v
 		hd = &v.jsonEncDriver
 		hd = &v.jsonEncDriver
 	}
 	}
-	hd.e, hd.h, hd.bs = e, h, hd.b[:0]
+	hd.e, hd.h = e, h
+	// hd.bs = hd.b[:0]
 	ee.reset()
 	ee.reset()
 	return
 	return
 }
 }
@@ -1419,7 +1453,7 @@ func (h *JsonHandle) newEncDriver(e *Encoder) (ee encDriver) {
 func (h *JsonHandle) newDecDriver(d *Decoder) decDriver {
 func (h *JsonHandle) newDecDriver(d *Decoder) decDriver {
 	// d := jsonDecDriver{r: r.(*bytesDecReader), h: h}
 	// d := jsonDecDriver{r: r.(*bytesDecReader), h: h}
 	hd := jsonDecDriver{d: d, h: h}
 	hd := jsonDecDriver{d: d, h: h}
-	hd.bs = hd.b[:0]
+	// hd.bs = hd.b[:0]
 	hd.reset()
 	hd.reset()
 	return &hd
 	return &hd
 }
 }
@@ -1451,7 +1485,12 @@ func (d *jsonDecDriver) reset() {
 	// d.n.reset()
 	// d.n.reset()
 }
 }
 
 
-func (d *jsonDecDriver) atEndOfDecode() {}
+func (d *jsonDecDriver) atEndOfDecode() {
+	if d.bs != nil {
+		d.bs = nil
+		d.bp.end()
+	}
+}
 
 
 // jsonFloatStrconvFmtPrec ...
 // jsonFloatStrconvFmtPrec ...
 //
 //

+ 1 - 0
codec/writer.go

@@ -164,6 +164,7 @@ func (z *bufioEncWriter) reset(w io.Writer, bufsize int) {
 		z.buf = z.b[:]
 		z.buf = z.b[:]
 	} else {
 	} else {
 		z.buf = z.bytesBufPooler.get(bufsize)
 		z.buf = z.bytesBufPooler.get(bufsize)
+		z.buf = z.buf[:cap(z.buf)]
 		// z.buf = make([]byte, bufsize)
 		// z.buf = make([]byte, bufsize)
 	}
 	}
 }
 }