Browse Source

codec: Fixes and clarifications for binc and rpc, in prep for c library.

- binc support for pruned values fixed.
- Added closed flag to rpcCodec, and use buffered reader for tracking custom header.
- Added simple codec - a much simplified version of binc (mostly academic).
- Built c implementation of binc support (for use in c, python, etc).
Ugorji Nwoke 12 years ago
parent
commit
cdeae7b766
10 changed files with 606 additions and 152 deletions
  1. 37 83
      codec/binc.go
  2. 18 17
      codec/codecs_test.go
  3. 16 14
      codec/decode.go
  4. 3 2
      codec/ext_dep_test.go
  5. 18 1
      codec/helper.go
  6. 24 19
      codec/helper_internal.go
  7. 19 14
      codec/msgpack.go
  8. 18 0
      codec/rpc.go
  9. 451 0
      codec/simple.go
  10. 2 2
      codec/time.go

+ 37 - 83
codec/binc.go

@@ -11,6 +11,8 @@ import (
 	//"fmt"
 	//"fmt"
 )
 )
 
 
+const bincDoPrune = true // No longer needed. Needed before as C lib did not support pruning.
+
 //var _ = fmt.Printf
 //var _ = fmt.Printf
 
 
 // vd as low 4 bits (there are 16 slots)
 // vd as low 4 bits (there are 16 slots)
@@ -104,52 +106,53 @@ func (e *bincEncDriver) encodeFloat64(f float64) {
 		return
 		return
 	}
 	}
 	bigen.PutUint64(e.b[:], math.Float64bits(f))
 	bigen.PutUint64(e.b[:], math.Float64bits(f))
-	var i int = 7
-	for ; i >= 0 && (e.b[i] == 0); i-- {
-	}
-	i++
-	if i > 6 { // 7 or 8 ie < 2 trailing zeros
-		e.w.writen1(bincVdFloat<<4 | bincFlBin64)
-		e.w.writeb(e.b[:])
-	} else {
-		e.w.writen1(bincVdFloat<<4 | 0x8 | bincFlBin64)
-		e.w.writen1(byte(i))
-		e.w.writeb(e.b[:i])
+	if bincDoPrune {
+		i := 7
+		for ; i >= 0 && (e.b[i] == 0); i-- {
+		}
+		i++
+		if i <= 6 {
+			e.w.writen1(bincVdFloat<<4 | 0x8 | bincFlBin64)
+			e.w.writen1(byte(i))
+			e.w.writeb(e.b[:i])
+			return
+		}
 	}
 	}
+	e.w.writen1(bincVdFloat<<4 | bincFlBin64)
+	e.w.writeb(e.b[:])
 }
 }
 
 
-func (e *bincEncDriver) encInteger4(bd byte, v uint32) {
-	const lim int = 4
-	eb := e.b[:lim]
-	bigen.PutUint32(eb, v)
-	i := pruneSignExt(eb)
-	e.w.writen1(bd | byte(lim-1-i))
-	e.w.writeb(e.b[i:lim])
-}
-
-func (e *bincEncDriver) encInteger8(bd byte, v uint64) {
-	const lim int = 8
+func (e *bincEncDriver) encIntegerPrune(bd byte, pos bool, v uint64, lim uint8) {
 	eb := e.b[:lim]
 	eb := e.b[:lim]
-	bigen.PutUint64(eb, v)
-	i := pruneSignExt(eb)
-	e.w.writen1(bd | byte(lim-1-i))
-	e.w.writeb(e.b[i:lim])
+	if lim == 4 {
+		bigen.PutUint32(eb, uint32(v))
+	} else {
+		bigen.PutUint64(eb, v)
+	}
+	if bincDoPrune {
+		i := pruneSignExt(eb, pos)
+		e.w.writen1(bd | lim - 1 - byte(i))
+		e.w.writeb(e.b[i:lim])
+	} else {
+		e.w.writen1(bd | lim - 1)
+		e.w.writeb(e.b[:lim])
+	}
 }
 }
 
 
 func (e *bincEncDriver) encodeInt(v int64) {
 func (e *bincEncDriver) encodeInt(v int64) {
 	const nbd byte = bincVdNegInt << 4
 	const nbd byte = bincVdNegInt << 4
 	switch {
 	switch {
 	case v >= 0:
 	case v >= 0:
-		e.encUint(bincVdPosInt << 4, true, uint64(v))
+		e.encUint(bincVdPosInt<<4, true, uint64(v))
 	case v == -1:
 	case v == -1:
 		e.w.writen1(bincVdSpecial<<4 | bincSpNegOne)
 		e.w.writen1(bincVdSpecial<<4 | bincSpNegOne)
 	default:
 	default:
-		e.encUint(bincVdNegInt << 4, false, uint64(-v))
+		e.encUint(bincVdNegInt<<4, false, uint64(-v))
 	}
 	}
 }
 }
 
 
 func (e *bincEncDriver) encodeUint(v uint64) {
 func (e *bincEncDriver) encodeUint(v uint64) {
-	e.encUint(bincVdPosInt << 4, true, v)
+	e.encUint(bincVdPosInt<<4, true, v)
 }
 }
 
 
 func (e *bincEncDriver) encUint(bd byte, pos bool, v uint64) {
 func (e *bincEncDriver) encUint(bd byte, pos bool, v uint64) {
@@ -164,9 +167,9 @@ func (e *bincEncDriver) encUint(bd byte, pos bool, v uint64) {
 		e.w.writen1(bd | 0x01)
 		e.w.writen1(bd | 0x01)
 		e.w.writeUint16(uint16(v))
 		e.w.writeUint16(uint16(v))
 	case v <= math.MaxUint32:
 	case v <= math.MaxUint32:
-		e.encInteger4(bd, uint32(v))
+		e.encIntegerPrune(bd, pos, v, 4)
 	default:
 	default:
-		e.encInteger8(bd, v)
+		e.encIntegerPrune(bd, pos, v, 8)
 	}
 	}
 }
 }
 
 
@@ -428,45 +431,6 @@ func (d *bincDecDriver) decFloat() (f float64) {
 	return
 	return
 }
 }
 
 
-// func (d *bincDecDriver) decInt() (v int64) {
-// 	// need to inline the code (interface conversion and type assertion expensive)
-// 	switch d.vs {
-// 	case 0:
-// 		v = int64(int8(d.r.readn1()))
-// 	case 1:
-// 		d.r.readb(d.b[6:])
-// 		v = int64(int16(bigen.Uint16(d.b[6:])))
-// 	case 2:
-// 		d.r.readb(d.b[5:])
-// 		if d.b[5]&0x80 == 0 {
-// 			d.b[4] = 0
-// 		} else {
-// 			d.b[4] = 0xff
-// 		}
-// 		v = int64(int32(bigen.Uint32(d.b[4:])))
-// 	case 3:
-// 		d.r.readb(d.b[4:])
-// 		v = int64(int32(bigen.Uint32(d.b[4:])))
-// 	case 4, 5, 6:
-// 		lim := int(7 - d.vs)
-// 		d.r.readb(d.b[lim:])
-// 		var fillval byte = 0
-// 		if d.b[lim]&0x80 != 0 {
-// 			fillval = 0xff
-// 		}
-// 		for i := 0; i < lim; i++ {
-// 			d.b[i] = fillval
-// 		}
-// 		v = int64(bigen.Uint64(d.b[:]))
-// 	case 7:
-// 		d.r.readb(d.b[:])
-// 		v = int64(bigen.Uint64(d.b[:]))
-// 	default:
-// 		decErr("integers with greater than 64 bits of precision not supported")
-// 	}
-// 	return
-// }
-
 func (d *bincDecDriver) decUint() (v uint64) {
 func (d *bincDecDriver) decUint() (v uint64) {
 	// need to inline the code (interface conversion and type assertion expensive)
 	// need to inline the code (interface conversion and type assertion expensive)
 	switch d.vs {
 	switch d.vs {
@@ -529,12 +493,7 @@ func (d *bincDecDriver) decIntAny() (ui uint64, i int64, neg bool) {
 
 
 func (d *bincDecDriver) decodeInt(bitsize uint8) (i int64) {
 func (d *bincDecDriver) decodeInt(bitsize uint8) (i int64) {
 	_, i, _ = d.decIntAny()
 	_, i, _ = d.decIntAny()
-	// check overflow (logic adapted from std pkg reflect/value.go OverflowUint()
-	if bitsize > 0 {
-		if trunc := (i << (64 - bitsize)) >> (64 - bitsize); i != trunc {
-			decErr("Overflow int value: %v", i)
-		}
-	}
+	checkOverflow(0, i, bitsize)
 	d.bdRead = false
 	d.bdRead = false
 	return
 	return
 }
 }
@@ -544,12 +503,7 @@ func (d *bincDecDriver) decodeUint(bitsize uint8) (ui uint64) {
 	if neg {
 	if neg {
 		decErr("Assigning negative signed value: %v, to unsigned type", i)
 		decErr("Assigning negative signed value: %v, to unsigned type", i)
 	}
 	}
-	// check overflow (logic adapted from std pkg reflect/value.go OverflowUint()
-	if bitsize > 0 {
-		if trunc := (ui << (64 - bitsize)) >> (64 - bitsize); ui != trunc {
-			decErr("Overflow uint value: %v", ui)
-		}
-	}
+	checkOverflow(ui, 0, bitsize)
 	d.bdRead = false
 	d.bdRead = false
 	return
 	return
 }
 }
@@ -759,7 +713,7 @@ func (d *bincDecDriver) decodeNaked() (v interface{}, vt valueType, decodeFurthe
 		}
 		}
 	case bincVdSmallInt:
 	case bincVdSmallInt:
 		vt = valueTypeUint
 		vt = valueTypeUint
-		v = int64(int8(d.vs)) + 1 // int8(d.vs) + 1
+		v = uint64(int8(d.vs)) + 1 // int8(d.vs) + 1
 	case bincVdPosInt:
 	case bincVdPosInt:
 		vt = valueTypeUint
 		vt = valueTypeUint
 		v = d.decUint()
 		v = d.decUint()

+ 18 - 17
codec/codecs_test.go

@@ -345,9 +345,9 @@ func testInit() {
 			"SHORT STRING": "1234567890",
 			"SHORT STRING": "1234567890",
 		},
 		},
 		map[interface{}]interface{}{
 		map[interface{}]interface{}{
-			true:     "true",
+			true:       "true",
 			uint8(138): false,
 			uint8(138): false,
-			"false":  uint8(200),
+			"false":    uint8(200),
 		},
 		},
 		newTestStruc(0, false),
 		newTestStruc(0, false),
 	}
 	}
@@ -433,7 +433,7 @@ func testUnmarshalErr(v interface{}, data []byte, h Handle, t *testing.T, name s
 		logT(t, "Error Decoding into %s: %v, Err: %v", name, v, err)
 		logT(t, "Error Decoding into %s: %v, Err: %v", name, v, err)
 		t.FailNow()
 		t.FailNow()
 	}
 	}
-	return   
+	return
 }
 }
 
 
 func newTestStruc(depth int, bench bool) (ts *TestStruc) {
 func newTestStruc(depth int, bench bool) (ts *TestStruc) {
@@ -532,7 +532,6 @@ func doTestCodecTableOne(t *testing.T, testNil bool, h Handle,
 		}
 		}
 
 
 		logT(t, "         v1 returned: %T, %#v", v1, v1)
 		logT(t, "         v1 returned: %T, %#v", v1, v1)
-		// t.FailNow() //todo: ugorji: remove
 		// if v1 != nil {
 		// if v1 != nil {
 		//	logT(t, "         v1 returned: %T, %#v", v1, v1)
 		//	logT(t, "         v1 returned: %T, %#v", v1, v1)
 		//	//we always indirect, because ptr to typed value may be passed (if not testNil)
 		//	//we always indirect, because ptr to typed value may be passed (if not testNil)
@@ -666,7 +665,7 @@ func testCodecMiscOne(t *testing.T, h Handle) {
 		logT(t, "Not Equal: %v. m: %v, m2: %v", err, m, m2)
 		logT(t, "Not Equal: %v. m: %v, m2: %v", err, m, m2)
 		t.FailNow()
 		t.FailNow()
 	}
 	}
-	
+
 	// func TestMsgpackDecodeStructSubset(t *testing.T) {
 	// func TestMsgpackDecodeStructSubset(t *testing.T) {
 	// test that we can decode a subset of the stream
 	// test that we can decode a subset of the stream
 	mm := map[string]interface{}{"A": 5, "B": 99, "C": 333}
 	mm := map[string]interface{}{"A": 5, "B": 99, "C": 333}
@@ -683,16 +682,16 @@ func testCodecMiscOne(t *testing.T, h Handle) {
 	// println(">>>>>")
 	// println(">>>>>")
 	// test simple arrays, non-addressable arrays, slices
 	// test simple arrays, non-addressable arrays, slices
 	type tarr struct {
 	type tarr struct {
-		A int64 
-		B [3]int64 
-		C []byte  
-		D [3]byte 
+		A int64
+		B [3]int64
+		C []byte
+		D [3]byte
 	}
 	}
-	var tarr0 = tarr{1, [3]int64{2,3,4}, []byte{4,5,6}, [3]byte{7,8,9} }
+	var tarr0 = tarr{1, [3]int64{2, 3, 4}, []byte{4, 5, 6}, [3]byte{7, 8, 9}}
 	// test both pointer and non-pointer (value)
 	// test both pointer and non-pointer (value)
 	for _, tarr1 := range []interface{}{tarr0, &tarr0} {
 	for _, tarr1 := range []interface{}{tarr0, &tarr0} {
 		bs, err = testMarshalErr(tarr1, h, t, "tarr1")
 		bs, err = testMarshalErr(tarr1, h, t, "tarr1")
-		var tarr2 tarr 
+		var tarr2 tarr
 		testUnmarshalErr(&tarr2, bs, h, t, "tarr2")
 		testUnmarshalErr(&tarr2, bs, h, t, "tarr2")
 		checkEqualT(t, tarr0, tarr2, "tarr0=tarr2")
 		checkEqualT(t, tarr0, tarr2, "tarr0=tarr2")
 		// fmt.Printf(">>>> err: %v. tarr1: %v, tarr2: %v\n", err, tarr0, tarr2)
 		// fmt.Printf(">>>> err: %v. tarr1: %v, tarr2: %v\n", err, tarr0, tarr2)
@@ -750,17 +749,19 @@ func doTestRpcOne(t *testing.T, rr Rpc, h Handle, doRequest bool, exitSleepMs ti
 	serverFn := func() {
 	serverFn := func() {
 		for {
 		for {
 			conn1, err1 := ln.Accept()
 			conn1, err1 := ln.Accept()
-			if err1 != nil {
-				//fmt.Printf("accept err1: %v\n", err1)
-				continue
-			}
+			// if err1 != nil {
+			// 	//fmt.Printf("accept err1: %v\n", err1)
+			// 	continue
+			// }
 			if atomic.LoadUint64(&serverExitFlag) == 1 {
 			if atomic.LoadUint64(&serverExitFlag) == 1 {
 				serverExitChan <- true
 				serverExitChan <- true
 				conn1.Close()
 				conn1.Close()
 				return // exit serverFn goroutine
 				return // exit serverFn goroutine
 			}
 			}
-			var sc rpc.ServerCodec = rr.ServerCodec(conn1, h)
-			srv.ServeCodec(sc)
+			if err1 == nil {
+				var sc rpc.ServerCodec = rr.ServerCodec(conn1, h)
+				srv.ServeCodec(sc)
+			}
 		}
 		}
 	}
 	}
 
 

+ 16 - 14
codec/decode.go

@@ -415,17 +415,17 @@ func (f *decFnInfo) kStruct(rv reflect.Value) {
 func (f *decFnInfo) kSlice(rv reflect.Value) {
 func (f *decFnInfo) kSlice(rv reflect.Value) {
 	// A slice can be set from a map or array in stream.
 	// A slice can be set from a map or array in stream.
 	currEncodedType := f.dd.currentEncodedType()
 	currEncodedType := f.dd.currentEncodedType()
-	
+
 	switch currEncodedType {
 	switch currEncodedType {
 	case valueTypeBytes, valueTypeString:
 	case valueTypeBytes, valueTypeString:
-		if f.ti.rtid == uint8SliceTypId || f.ti.rt.Elem().Kind() == reflect.Uint8 { 
+		if f.ti.rtid == uint8SliceTypId || f.ti.rt.Elem().Kind() == reflect.Uint8 {
 			if bs2, changed2 := f.dd.decodeBytes(rv.Bytes()); changed2 {
 			if bs2, changed2 := f.dd.decodeBytes(rv.Bytes()); changed2 {
 				rv.SetBytes(bs2)
 				rv.SetBytes(bs2)
 			}
 			}
 			return
 			return
 		}
 		}
 	}
 	}
-	
+
 	if shortCircuitReflectToFastPath && rv.CanAddr() {
 	if shortCircuitReflectToFastPath && rv.CanAddr() {
 		switch f.ti.rtid {
 		switch f.ti.rtid {
 		case intfSliceTypId:
 		case intfSliceTypId:
@@ -450,11 +450,11 @@ func (f *decFnInfo) kSlice(rv reflect.Value) {
 	if rv.IsNil() {
 	if rv.IsNil() {
 		rv.Set(reflect.MakeSlice(f.ti.rt, containerLenS, containerLenS))
 		rv.Set(reflect.MakeSlice(f.ti.rt, containerLenS, containerLenS))
 	}
 	}
-	
+
 	if containerLen == 0 {
 	if containerLen == 0 {
 		return
 		return
 	}
 	}
-	
+
 	if rvcap, rvlen := rv.Len(), rv.Cap(); containerLenS > rvcap {
 	if rvcap, rvlen := rv.Len(), rv.Cap(); containerLenS > rvcap {
 		if f.array { // !rv.CanSet()
 		if f.array { // !rv.CanSet()
 			decErr(msgDecCannotExpandArr, rvcap, containerLenS)
 			decErr(msgDecCannotExpandArr, rvcap, containerLenS)
@@ -634,7 +634,7 @@ func (d *Decoder) decode(iv interface{}) {
 
 
 	case reflect.Value:
 	case reflect.Value:
 		d.chkPtrValue(v)
 		d.chkPtrValue(v)
-		d.decodeValue(v)
+		d.decodeValue(v.Elem())
 
 
 	case *string:
 	case *string:
 		*v = d.d.decodeString()
 		*v = d.d.decodeString()
@@ -665,8 +665,7 @@ func (d *Decoder) decode(iv interface{}) {
 	case *float64:
 	case *float64:
 		*v = d.d.decodeFloat(false)
 		*v = d.d.decodeFloat(false)
 	case *[]byte:
 	case *[]byte:
-		v2 := *v
-		*v, _ = d.d.decodeBytes(v2)
+		*v, _ = d.d.decodeBytes(*v)
 
 
 	case *[]interface{}:
 	case *[]interface{}:
 		d.decSliceIntf(v, valueTypeInvalid, false)
 		d.decSliceIntf(v, valueTypeInvalid, false)
@@ -691,7 +690,7 @@ func (d *Decoder) decode(iv interface{}) {
 	default:
 	default:
 		rv := reflect.ValueOf(iv)
 		rv := reflect.ValueOf(iv)
 		d.chkPtrValue(rv)
 		d.chkPtrValue(rv)
-		d.decodeValue(rv)
+		d.decodeValue(rv.Elem())
 	}
 	}
 }
 }
 
 
@@ -700,9 +699,15 @@ func (d *Decoder) decodeValue(rv reflect.Value) {
 
 
 	if d.d.tryDecodeAsNil() {
 	if d.d.tryDecodeAsNil() {
 		// If value in stream is nil, set the dereferenced value to its "zero" value (if settable)
 		// If value in stream is nil, set the dereferenced value to its "zero" value (if settable)
-		for rv.Kind() == reflect.Ptr {
-			rv = rv.Elem()
+		if rv.Kind() == reflect.Ptr {
+			if !rv.IsNil() {
+				rv.Set(reflect.Zero(rv.Type()))
+			}
+			return
 		}
 		}
+		// for rv.Kind() == reflect.Ptr {
+		// 	rv = rv.Elem()
+		// }
 		if rv.IsValid() { // rv.CanSet() // always settable, except it's invalid
 		if rv.IsValid() { // rv.CanSet() // always settable, except it's invalid
 			rv.Set(reflect.Zero(rv.Type()))
 			rv.Set(reflect.Zero(rv.Type()))
 		}
 		}
@@ -1035,6 +1040,3 @@ func decContLens(dd decDriver, currEncodedType valueType) (containerLen, contain
 func decErr(format string, params ...interface{}) {
 func decErr(format string, params ...interface{}) {
 	doPanic(msgTagDec, format, params...)
 	doPanic(msgTagDec, format, params...)
 }
 }
-
-
-

+ 3 - 2
codec/ext_dep_test.go

@@ -1,4 +1,4 @@
-//+build ignore
+// //+build ignore
 
 
 // Copyright (c) 2012, 2013 Ugorji Nwoke. All rights reserved.
 // Copyright (c) 2012, 2013 Ugorji Nwoke. All rights reserved.
 // Use of this source code is governed by a BSD-style license found in the LICENSE file.
 // Use of this source code is governed by a BSD-style license found in the LICENSE file.
@@ -17,9 +17,10 @@ package codec
 //       go test -bi -bench=.
 //       go test -bi -bench=.
 
 
 import (
 import (
+	"testing"
+
 	vmsgpack "github.com/vmihailenco/msgpack"
 	vmsgpack "github.com/vmihailenco/msgpack"
 	"labix.org/v2/mgo/bson"
 	"labix.org/v2/mgo/bson"
-	"testing"
 )
 )
 
 
 func init() {
 func init() {

+ 18 - 1
codec/helper.go

@@ -74,7 +74,7 @@ const (
 	valueTypeArray
 	valueTypeArray
 	valueTypeTimestamp
 	valueTypeTimestamp
 	valueTypeExt
 	valueTypeExt
-	
+
 	valueTypeInvalid = 0xff
 	valueTypeInvalid = 0xff
 )
 )
 
 
@@ -555,3 +555,20 @@ func doPanic(tag string, format string, params ...interface{}) {
 	copy(params2[1:], params)
 	copy(params2[1:], params)
 	panic(fmt.Errorf("%s: "+format, params2...))
 	panic(fmt.Errorf("%s: "+format, params2...))
 }
 }
+
+func checkOverflow(ui uint64, i int64, bitsize uint8) {
+	// check overflow (logic adapted from std pkg reflect/value.go OverflowUint()
+	if bitsize == 0 {
+		return
+	}
+	if i != 0 {
+		if trunc := (i << (64 - bitsize)) >> (64 - bitsize); i != trunc {
+			decErr("Overflow int value: %v", i)
+		}
+	}
+	if ui != 0 {
+		if trunc := (ui << (64 - bitsize)) >> (64 - bitsize); ui != trunc {
+			decErr("Overflow uint value: %v", ui)
+		}
+	}
+}

+ 24 - 19
codec/helper_internal.go

@@ -34,6 +34,7 @@ func panicValToErr(panicVal interface{}, err *error) {
 }
 }
 
 
 func isEmptyValue(v reflect.Value) bool {
 func isEmptyValue(v reflect.Value) bool {
+	const deref = true
 	switch v.Kind() {
 	switch v.Kind() {
 	case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
 	case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
 		return v.Len() == 0
 		return v.Len() == 0
@@ -46,7 +47,23 @@ func isEmptyValue(v reflect.Value) bool {
 	case reflect.Float32, reflect.Float64:
 	case reflect.Float32, reflect.Float64:
 		return v.Float() == 0
 		return v.Float() == 0
 	case reflect.Interface, reflect.Ptr:
 	case reflect.Interface, reflect.Ptr:
-		return v.IsNil()
+		if deref {
+			if v.IsNil() {
+				return true
+			}
+			return isEmptyValue(v.Elem())
+		} else {
+			return v.IsNil()
+		}
+	case reflect.Struct:
+		// return true if all fields are empty. else return false.
+		return v.Interface() == reflect.Zero(v.Type()).Interface()
+		// for i, n := 0, v.NumField(); i < n; i++ {
+		// 	if !isEmptyValue(v.Field(i), deref) {
+		// 		return false
+		// 	}
+		// }
+		// return true
 	}
 	}
 	return false
 	return false
 }
 }
@@ -60,26 +77,14 @@ func debugf(format string, args ...interface{}) {
 	}
 	}
 }
 }
 
 
-func pruneSignExt(v []byte) (n int) {
-	l := len(v)
-	if l < 2 {
-		return
-	}
-	if v[0] == 0 {
-		n2 := n + 1
-		for v[n] == 0 && n2 < l && (v[n2]&(1<<7) == 0) {
-			n++
-			n2++
+func pruneSignExt(v []byte, pos bool) (n int) {
+	if len(v) < 2 {
+	} else if pos && v[0] == 0 {
+		for ; v[n] == 0 && n+1 < len(v) && (v[n+1]&(1<<7) == 0); n++ {
 		}
 		}
-		return
-	}
-	if v[0] == 0xff {
-		n2 := n + 1
-		for v[n] == 0xff && n2 < l && (v[n2]&(1<<7) != 0) {
-			n++
-			n2++
+	} else if !pos && v[0] == 0xff {
+		for ; v[n] == 0xff && n+1 < len(v) && (v[n+1]&(1<<7) != 0); n++ {
 		}
 		}
-		return
 	}
 	}
 	return
 	return
 }
 }

+ 19 - 14
codec/msgpack.go

@@ -9,11 +9,11 @@ We need to maintain compatibility with it and how it encodes integer values
 without caring about the type.
 without caring about the type.
 
 
 For compatibility with behaviour of msgpack-c reference implementation:
 For compatibility with behaviour of msgpack-c reference implementation:
-  - Go intX (>0) and uintX 
-       IS ENCODED AS 
+  - Go intX (>0) and uintX
+       IS ENCODED AS
     msgpack +ve fixnum, unsigned
     msgpack +ve fixnum, unsigned
   - Go intX (<0)
   - Go intX (<0)
-       IS ENCODED AS 
+       IS ENCODED AS
     msgpack -ve fixnum, signed
     msgpack -ve fixnum, signed
 
 
 */
 */
@@ -118,7 +118,7 @@ func (e *msgpackEncDriver) encodeNil() {
 }
 }
 
 
 func (e *msgpackEncDriver) encodeInt(i int64) {
 func (e *msgpackEncDriver) encodeInt(i int64) {
-	
+
 	switch {
 	switch {
 	case i >= 0:
 	case i >= 0:
 		e.encodeUint(uint64(i))
 		e.encodeUint(uint64(i))
@@ -763,24 +763,29 @@ func (c *msgpackSpecRpcCodec) ReadRequestBody(body interface{}) error {
 
 
 func (c *msgpackSpecRpcCodec) parseCustomHeader(expectTypeByte byte, msgid *uint64, methodOrError *string) (err error) {
 func (c *msgpackSpecRpcCodec) parseCustomHeader(expectTypeByte byte, msgid *uint64, methodOrError *string) (err error) {
 
 
+	if c.cls {
+		return io.EOF
+	}
+
 	// We read the response header by hand
 	// We read the response header by hand
 	// so that the body can be decoded on its own from the stream at a later time.
 	// so that the body can be decoded on its own from the stream at a later time.
 
 
-	bs := make([]byte, 1)
-	n, err := c.rwc.Read(bs)
+	const fia byte = 0x94 //four item array descriptor value
+	// Not sure why the panic of EOF is swallowed above.
+	// if bs1 := c.dec.r.readn1(); bs1 != fia {
+	// 	err = fmt.Errorf("Unexpected value for array descriptor: Expecting %v. Received %v", fia, bs1)
+	// 	return
+	// }
+	var b byte
+	b, err = c.br.ReadByte()
 	if err != nil {
 	if err != nil {
 		return
 		return
 	}
 	}
-	if n != 1 {
-		err = fmt.Errorf("Couldn't read array descriptor: No bytes read")
+	if b != fia {
+		err = fmt.Errorf("Unexpected value for array descriptor: Expecting %v. Received %v", fia, b)
 		return
 		return
 	}
 	}
-	const fia byte = 0x94 //four item array descriptor value
-	if bs[0] != fia {
-		err = fmt.Errorf("Unexpected value for array descriptor: Expecting %v. Received %v", fia, bs[0])
-		return
-	}
-	var b byte
+
 	if err = c.read(&b); err != nil {
 	if err = c.read(&b); err != nil {
 		return
 		return
 	}
 	}

+ 18 - 0
codec/rpc.go

@@ -7,6 +7,7 @@ import (
 	"bufio"
 	"bufio"
 	"io"
 	"io"
 	"net/rpc"
 	"net/rpc"
+	"sync"
 )
 )
 
 
 // Rpc provides a rpc Server or Client Codec for rpc communication.
 // Rpc provides a rpc Server or Client Codec for rpc communication.
@@ -33,6 +34,8 @@ type rpcCodec struct {
 	enc *Encoder
 	enc *Encoder
 	bw  *bufio.Writer
 	bw  *bufio.Writer
 	br  *bufio.Reader
 	br  *bufio.Reader
+	mu  sync.Mutex
+	cls bool
 }
 }
 
 
 func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
 func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
@@ -56,6 +59,9 @@ func (c *rpcCodec) BufferedWriter() *bufio.Writer {
 }
 }
 
 
 func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err error) {
 func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err error) {
+	if c.cls {
+		return io.EOF
+	}
 	if err = c.enc.Encode(obj1); err != nil {
 	if err = c.enc.Encode(obj1); err != nil {
 		return
 		return
 	}
 	}
@@ -71,6 +77,9 @@ func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err e
 }
 }
 
 
 func (c *rpcCodec) read(obj interface{}) (err error) {
 func (c *rpcCodec) read(obj interface{}) (err error) {
+	if c.cls {
+		return io.EOF
+	}
 	//If nil is passed in, we should still attempt to read content to nowhere.
 	//If nil is passed in, we should still attempt to read content to nowhere.
 	if obj == nil {
 	if obj == nil {
 		var obj2 interface{}
 		var obj2 interface{}
@@ -80,6 +89,10 @@ func (c *rpcCodec) read(obj interface{}) (err error) {
 }
 }
 
 
 func (c *rpcCodec) Close() error {
 func (c *rpcCodec) Close() error {
+	if c.cls {
+		return io.EOF
+	}
+	c.cls = true
 	return c.rwc.Close()
 	return c.rwc.Close()
 }
 }
 
 
@@ -94,10 +107,15 @@ type goRpcCodec struct {
 }
 }
 
 
 func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
 func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
+	// Must protect for concurrent access as per API
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	return c.write(r, body, true, true)
 	return c.write(r, body, true, true)
 }
 }
 
 
 func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
 func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	return c.write(r, body, true, true)
 	return c.write(r, body, true, true)
 }
 }
 
 

+ 451 - 0
codec/simple.go

@@ -0,0 +1,451 @@
+// Copyright (c) 2012, 2013 Ugorji Nwoke. All rights reserved.
+// Use of this source code is governed by a BSD-style license found in the LICENSE file.
+
+/*
+simple is a simplistic codec which does not pack integers, floats, etc into smaller chunks.
+
+All numbers are encoded with 8 bytes (int64, uint64, float64).
+
+Beyond this, it looks very similar to binc or msgpack.
+
+- nil, false, true are represented with single bytes
+- positive/negative ints, and floats are distinct types, all represented by 1,2,4 or 8 bytes
+- lengths of containers (strings, arrays, ext, map) occupy 2 bytes and
+  exist right after the bd
+- maps are encoded as [bd] [length] [[key][value]]...
+- arrays are encoded as [bd] [length] [value]...
+- extensions are encoded as [bd] [length] [tag] [byte]...
+- strings/bytearrays are encoded as [bd] [length] [tag] [byte]...
+
+*/
+
+package codec
+
+import "math"
+
+const (
+	_ byte = iota
+	simpleVdNil
+	simpleVdFalse
+	simpleVdTrue
+	simpleVdPosInt1
+	simpleVdPosInt2
+	simpleVdPosInt4
+	simpleVdPosInt8
+	simpleVdNegInt1
+	simpleVdNegInt2
+	simpleVdNegInt4
+	simpleVdNegInt8
+	simpleVdFloat32
+	simpleVdFloat64
+
+	simpleVdString
+	simpleVdByteArray
+	simpleVdArray
+	simpleVdMap
+
+	simpleVdExt
+)
+
+type simpleEncDriver struct {
+	w encWriter
+	b [8]byte
+}
+
+func (e *simpleEncDriver) isBuiltinType(rt uintptr) bool {
+	return false
+}
+
+func (e *simpleEncDriver) encodeBuiltin(rt uintptr, v interface{}) {
+}
+
+func (e *simpleEncDriver) encodeNil() {
+	e.w.writen1(simpleVdNil)
+	// e.w.writen1(0)
+}
+
+func (e *simpleEncDriver) encodeBool(b bool) {
+	if b {
+		e.w.writen1(simpleVdTrue)
+	} else {
+		e.w.writen1(simpleVdFalse)
+	}
+}
+
+func (e *simpleEncDriver) encodeFloat32(f float32) {
+	e.w.writen1(simpleVdFloat32)
+	e.w.writeUint32(math.Float32bits(f))
+}
+
+func (e *simpleEncDriver) encodeFloat64(f float64) {
+	e.w.writen1(simpleVdFloat64)
+	e.w.writeUint64(math.Float64bits(f))
+}
+
+func (e *simpleEncDriver) encodeInt(v int64) {
+	if v < 0 {
+		e.encUint(uint64(-v), false)
+	} else {
+		e.encUint(uint64(v), true)
+	}
+}
+
+func (e *simpleEncDriver) encodeUint(v uint64) {
+	e.encUint(v, true)
+}
+
+func (e *simpleEncDriver) encUint(v uint64, pos bool) {
+	switch {
+	case v <= math.MaxUint8:
+		if pos {
+			e.w.writen1(simpleVdPosInt1)
+		} else {
+			e.w.writen1(simpleVdNegInt1)
+		}
+		e.w.writen1(uint8(v))
+	case v <= math.MaxUint16:
+		if pos {
+			e.w.writen1(simpleVdPosInt2)
+		} else {
+			e.w.writen1(simpleVdNegInt2)
+		}
+		e.w.writeUint16(uint16(v))
+	case v <= math.MaxUint32:
+		if pos {
+			e.w.writen1(simpleVdPosInt4)
+		} else {
+			e.w.writen1(simpleVdNegInt4)
+		}
+		e.w.writeUint32(uint32(v))
+	case v <= math.MaxUint64:
+		if pos {
+			e.w.writen1(simpleVdPosInt8)
+		} else {
+			e.w.writen1(simpleVdNegInt8)
+		}
+		e.w.writeUint64(v)
+	}
+}
+
+func (e *simpleEncDriver) encodeExtPreamble(xtag byte, length int) {
+	e.w.writen1(simpleVdExt)
+	e.w.writeUint32(uint32(length))
+	e.w.writen1(xtag)
+}
+
+func (e *simpleEncDriver) encodeArrayPreamble(length int) {
+	e.w.writen1(simpleVdArray)
+	e.w.writeUint32(uint32(length))
+}
+
+func (e *simpleEncDriver) encodeMapPreamble(length int) {
+	e.w.writen1(simpleVdMap)
+	e.w.writeUint32(uint32(length))
+}
+
+func (e *simpleEncDriver) encodeString(c charEncoding, v string) {
+	e.w.writen1(simpleVdString)
+	e.w.writeUint32(uint32(len(v)))
+	e.w.writestr(v)
+}
+
+func (e *simpleEncDriver) encodeSymbol(v string) {
+	e.encodeString(c_UTF8, v)
+}
+
+func (e *simpleEncDriver) encodeStringBytes(c charEncoding, v []byte) {
+	e.w.writen1(simpleVdByteArray)
+	e.w.writeUint32(uint32(len(v)))
+	e.w.writeb(v)
+}
+
+//------------------------------------
+
+type simpleDecDriver struct {
+	r      decReader
+	bdRead bool
+	bdType valueType
+	bd     byte
+	b      [8]byte
+}
+
+func (d *simpleDecDriver) initReadNext() {
+	if d.bdRead {
+		return
+	}
+	d.bd = d.r.readn1()
+	d.bdRead = true
+	d.bdType = valueTypeUnset
+}
+
+func (d *simpleDecDriver) currentEncodedType() valueType {
+	if d.bdType == valueTypeUnset {
+		switch d.bd {
+		case simpleVdNil:
+			d.bdType = valueTypeNil
+		case simpleVdTrue, simpleVdFalse:
+			d.bdType = valueTypeBool
+		case simpleVdPosInt1, simpleVdPosInt2, simpleVdPosInt4, simpleVdPosInt8:
+			d.bdType = valueTypeUint
+		case simpleVdNegInt1, simpleVdNegInt2, simpleVdNegInt4, simpleVdNegInt8:
+			d.bdType = valueTypeInt
+		case simpleVdFloat32, simpleVdFloat64:
+			d.bdType = valueTypeFloat
+		case simpleVdString:
+			d.bdType = valueTypeString
+		case simpleVdByteArray:
+			d.bdType = valueTypeBytes
+		case simpleVdExt:
+			d.bdType = valueTypeExt
+		case simpleVdArray:
+			d.bdType = valueTypeArray
+		case simpleVdMap:
+			d.bdType = valueTypeMap
+		default:
+			decErr("currentEncodedType: Unrecognized d.vd: 0x%x", d.bd)
+		}
+	}
+	return d.bdType
+}
+
+func (d *simpleDecDriver) tryDecodeAsNil() bool {
+	if d.bd == simpleVdNil {
+		d.bdRead = false
+		return true
+	}
+	return false
+}
+
+func (d *simpleDecDriver) isBuiltinType(rt uintptr) bool {
+	return false
+}
+
+func (d *simpleDecDriver) decodeBuiltin(rt uintptr, v interface{}) {
+}
+
+func (d *simpleDecDriver) decIntAny() (ui uint64, i int64, neg bool) {
+	switch d.bd {
+	case simpleVdPosInt1:
+		ui = uint64(d.r.readn1())
+	case simpleVdPosInt2:
+		ui = uint64(d.r.readUint16())
+	case simpleVdPosInt4:
+		ui = uint64(d.r.readUint32())
+	case simpleVdPosInt8:
+		ui = uint64(d.r.readUint64())
+	case simpleVdNegInt1:
+		ui = uint64(d.r.readn1())
+		neg = true
+	case simpleVdNegInt2:
+		ui = uint64(d.r.readUint16())
+		neg = true
+	case simpleVdNegInt4:
+		ui = uint64(d.r.readUint32())
+		neg = true
+	case simpleVdNegInt8:
+		ui = uint64(d.r.readUint64())
+		neg = true
+	default:
+		decErr("Integer only valid from pos/neg integer1..8. Invalid descriptor: %v", d.bd)
+	}
+	if neg {
+		i = -(int64(ui))
+	} else {
+		i = int64(ui)
+	}
+	return
+}
+
+func (d *simpleDecDriver) decodeInt(bitsize uint8) (i int64) {
+	_, i, _ = d.decIntAny()
+	checkOverflow(0, i, bitsize)
+	d.bdRead = false
+	return
+}
+
+func (d *simpleDecDriver) decodeUint(bitsize uint8) (ui uint64) {
+	ui, i, neg := d.decIntAny()
+	if neg {
+		decErr("Assigning negative signed value: %v, to unsigned type", i)
+	}
+	checkOverflow(ui, 0, bitsize)
+	d.bdRead = false
+	return
+}
+
+func (d *simpleDecDriver) decodeFloat(chkOverflow32 bool) (f float64) {
+	switch d.bd {
+	case simpleVdFloat32:
+		f = float64(math.Float32frombits(d.r.readUint32()))
+	case simpleVdFloat64:
+		f = math.Float64frombits(d.r.readUint64())
+	default:
+		if d.bd >= simpleVdPosInt1 && d.bd <= simpleVdNegInt8 {
+			_, i, _ := d.decIntAny()
+			f = float64(i)
+		} else {
+			decErr("Float only valid from float32/64: Invalid descriptor: %v", d.bd)
+		}
+	}
+	// check overflow (logic adapted from std pkg reflect/value.go OverflowFloat()
+	if chkOverflow32 {
+		f2 := f
+		if f2 < 0 {
+			f2 = -f
+		}
+		if math.MaxFloat32 < f2 && f2 <= math.MaxFloat64 {
+			decErr("Overflow float32 value: %v", f2)
+		}
+	}
+	return
+}
+
+// bool can be decoded from bool only (single byte).
+func (d *simpleDecDriver) decodeBool() (b bool) {
+	b = d.r.readn1() != 0
+	d.bdRead = false
+	return
+}
+
+func (d *simpleDecDriver) readMapLen() (length int) {
+	return d.decLen()
+}
+
+func (d *simpleDecDriver) readArrayLen() (length int) {
+	return d.decLen()
+}
+
+func (d *simpleDecDriver) decLen() int {
+	d.r.readb(d.b[:4])
+	return int(bigen.Uint32(d.b[:4]))
+}
+
+func (d *simpleDecDriver) decodeString() (s string) {
+	s = string(d.r.readn(d.decLen()))
+	d.bdRead = false
+	return
+}
+
+func (d *simpleDecDriver) decodeBytes(bs []byte) (bsOut []byte, changed bool) {
+	if clen := d.decLen(); clen > 0 {
+		// if no contents in stream, don't update the passed byteslice
+		if len(bs) != clen {
+			if len(bs) > clen {
+				bs = bs[:clen]
+			} else {
+				bs = make([]byte, clen)
+			}
+			bsOut = bs
+			changed = true
+		}
+		d.r.readb(bs)
+	}
+	d.bdRead = false
+	return
+}
+
+func (d *simpleDecDriver) decodeExt(verifyTag bool, tag byte) (xtag byte, xbs []byte) {
+	switch d.bd {
+	case simpleVdExt:
+		l := d.decLen()
+		xtag = d.r.readn1()
+		if verifyTag && xtag != tag {
+			decErr("Wrong extension tag. Got %b. Expecting: %v", xtag, tag)
+		}
+		xbs = d.r.readn(l)
+	case simpleVdByteArray:
+		xbs, _ = d.decodeBytes(nil)
+	default:
+		decErr("Invalid d.vd for extensions (Expecting extensions or byte array). Got: 0x%x", d.bd)
+	}
+	d.bdRead = false
+	return
+}
+
+func (d *simpleDecDriver) decodeNaked() (v interface{}, vt valueType, decodeFurther bool) {
+	d.initReadNext()
+
+	switch d.bd {
+	case simpleVdNil:
+		vt = valueTypeNil
+	case simpleVdFalse:
+		vt = valueTypeBool
+		v = false
+	case simpleVdTrue:
+		vt = valueTypeBool
+		v = true
+	case simpleVdPosInt1, simpleVdPosInt2, simpleVdPosInt4, simpleVdPosInt8:
+		vt = valueTypeUint
+		ui, _, _ := d.decIntAny()
+		v = ui
+	case simpleVdNegInt1, simpleVdNegInt2, simpleVdNegInt4, simpleVdNegInt8:
+		vt = valueTypeInt
+		_, i, _ := d.decIntAny()
+		v = i
+	case simpleVdFloat32:
+		vt = valueTypeFloat
+		v = d.decodeFloat(true)
+	case simpleVdFloat64:
+		vt = valueTypeFloat
+		v = d.decodeFloat(false)
+	case simpleVdString:
+		vt = valueTypeString
+		v = d.decodeString()
+	case simpleVdByteArray:
+		vt = valueTypeBytes
+		v, _ = d.decodeBytes(nil)
+	case simpleVdExt:
+		vt = valueTypeExt
+		l := d.decLen()
+		var re RawExt
+		re.Tag = d.r.readn1()
+		re.Data = d.r.readn(l)
+		v = &re
+		vt = valueTypeExt
+	case simpleVdArray:
+		vt = valueTypeArray
+		decodeFurther = true
+	case simpleVdMap:
+		vt = valueTypeMap
+		decodeFurther = true
+	default:
+		decErr("decodeNaked: Unrecognized d.vd: 0x%x", d.bd)
+	}
+
+	if !decodeFurther {
+		d.bdRead = false
+	}
+	return
+}
+
+//------------------------------------
+
+//SimpleHandle is a Handle for a very simple encoding format.
+//
+//The simple format is similar to binc:
+//  - All numbers are represented with 8 bytes (int64, uint64, float64)
+//  - Strings, []byte, arrays and maps have the length pre-pended as a uint64
+//  - Thus, there isn't much packing, but the format is extremely simple.
+//    and easy to create different implementations of (e.g. in C).
+type SimpleHandle struct {
+	BasicHandle
+}
+
+func (h *SimpleHandle) newEncDriver(w encWriter) encDriver {
+	return &simpleEncDriver{w: w}
+}
+
+func (h *SimpleHandle) newDecDriver(r decReader) decDriver {
+	return &simpleDecDriver{r: r}
+}
+
+func (_ *SimpleHandle) writeExt() bool {
+	return true
+}
+
+func (h *SimpleHandle) getBasicHandle() *BasicHandle {
+	return &h.BasicHandle
+}
+
+var _ decDriver = (*simpleDecDriver)(nil)
+var _ encDriver = (*simpleEncDriver)(nil)

+ 2 - 2
codec/time.go

@@ -76,7 +76,7 @@ func encodeTime(t time.Time) []byte {
 	if tsecs != 0 {
 	if tsecs != 0 {
 		bd = bd | 0x80
 		bd = bd | 0x80
 		bigen.PutUint64(btmp[:], uint64(tsecs))
 		bigen.PutUint64(btmp[:], uint64(tsecs))
-		f := pruneSignExt(btmp[:])
+		f := pruneSignExt(btmp[:], tsecs >= 0)
 		bd = bd | (byte(7-f) << 2)
 		bd = bd | (byte(7-f) << 2)
 		copy(bs[i:], btmp[f:])
 		copy(bs[i:], btmp[f:])
 		i = i + (8 - f)
 		i = i + (8 - f)
@@ -84,7 +84,7 @@ func encodeTime(t time.Time) []byte {
 	if tnsecs != 0 {
 	if tnsecs != 0 {
 		bd = bd | 0x40
 		bd = bd | 0x40
 		bigen.PutUint32(btmp[:4], uint32(tnsecs))
 		bigen.PutUint32(btmp[:4], uint32(tnsecs))
-		f := pruneSignExt(btmp[:4])
+		f := pruneSignExt(btmp[:4], true)
 		bd = bd | byte(3-f)
 		bd = bd | byte(3-f)
 		copy(bs[i:], btmp[f:4])
 		copy(bs[i:], btmp[f:4])
 		i = i + (4 - f)
 		i = i + (4 - f)