Browse Source

codec: streamline chan encoding/decoding

Previously, we would encode a chan by reading up to the length of the chan.
This is problematic because
- we do not consume any elements currently blocking due to a smaller chan cap
- we have no option to drain the chan until it is closed.

To support all models, we need to be able to collate all values received from the chan,
decipher the true number of values received, and then encode the true length and values appropriately.
This suggests that we should receive the values into a temporary slice, and encode that slice.

This is now supported fully, across standard reflection-based path and the codecgen pathway.
It is configured via the ChanRecvTimeout parameter in EncodeOptions, available to all handles.

TestStrucFlex was also expanded, to test and validate the expanded chan support.

Fixes #233
Ugorji Nwoke 7 năm trước cách đây
mục cha
commit
9af07b2035

+ 1 - 1
codec/binc.go

@@ -972,7 +972,7 @@ type BincHandle struct {
 	// - n: none
 	// - n: none
 	// - a: all: same as m, s, ...
 	// - a: all: same as m, s, ...
 
 
-	_ [1]uint64 // padding
+	// _ [1]uint64 // padding
 }
 }
 
 
 // Name returns the name of the handle: binc
 // Name returns the name of the handle: binc

+ 1 - 1
codec/cbor.go

@@ -724,7 +724,7 @@ type CborHandle struct {
 	// If unset, we encode time.Time using seconds past epoch.
 	// If unset, we encode time.Time using seconds past epoch.
 	TimeRFC3339 bool
 	TimeRFC3339 bool
 
 
-	_ [1]uint64 // padding
+	// _ [1]uint64 // padding
 }
 }
 
 
 // Name returns the name of the handle: cbor
 // Name returns the name of the handle: cbor

+ 1 - 1
codec/codec_test.go

@@ -881,7 +881,7 @@ func testCodecMiscOne(t *testing.T, h Handle) {
 	} else {
 	} else {
 		logT(t, "------- b: size: %v, value: %s", len(b), b1)
 		logT(t, "------- b: size: %v, value: %s", len(b), b1)
 	}
 	}
-	ts2 := new(TestStrucFlex)
+	ts2 := emptyTestStrucFlex()
 	testUnmarshalErr(ts2, b, h, t, "pointer-to-struct")
 	testUnmarshalErr(ts2, b, h, t, "pointer-to-struct")
 	if ts2.I64 != math.MaxInt64*2/3 {
 	if ts2.I64 != math.MaxInt64*2/3 {
 		logT(t, "------- Unmarshal wrong. Expect I64 = 64. Got: %v", ts2.I64)
 		logT(t, "------- Unmarshal wrong. Expect I64 = 64. Got: %v", ts2.I64)

+ 9 - 3
codec/decode.go

@@ -20,6 +20,10 @@ const (
 	msgDecCannotExpandArr = "cannot expand go array from %v to stream length: %v"
 	msgDecCannotExpandArr = "cannot expand go array from %v to stream length: %v"
 )
 )
 
 
+const decDefSliceCap = 8
+const decDefChanCap = 64 // should be large, as cap cannot be expanded
+const decScratchByteArrayLen = cacheLineSize - 8
+
 var (
 var (
 	errstrOnlyMapOrArrayCanDecodeIntoStruct = "only encoded map or array can be decoded into a struct"
 	errstrOnlyMapOrArrayCanDecodeIntoStruct = "only encoded map or array can be decoded into a struct"
 	errstrCannotDecodeIntoNil               = "cannot decode into nil"
 	errstrCannotDecodeIntoNil               = "cannot decode into nil"
@@ -1356,14 +1360,17 @@ func (d *Decoder) kSlice(f *codecFnInfo, rv reflect.Value) {
 		if j == 0 && (f.seq == seqTypeSlice || f.seq == seqTypeChan) && rv.IsNil() {
 		if j == 0 && (f.seq == seqTypeSlice || f.seq == seqTypeChan) && rv.IsNil() {
 			if hasLen {
 			if hasLen {
 				rvlen = decInferLen(containerLenS, d.h.MaxInitLen, rtelem0Size)
 				rvlen = decInferLen(containerLenS, d.h.MaxInitLen, rtelem0Size)
+			} else if f.seq == seqTypeSlice {
+				rvlen = decDefSliceCap
 			} else {
 			} else {
-				rvlen = 8
+				rvlen = decDefChanCap
 			}
 			}
 			if rvCanset {
 			if rvCanset {
 				if f.seq == seqTypeSlice {
 				if f.seq == seqTypeSlice {
 					rv = reflect.MakeSlice(ti.rt, rvlen, rvlen)
 					rv = reflect.MakeSlice(ti.rt, rvlen, rvlen)
 					rvChanged = true
 					rvChanged = true
 				} else { // chan
 				} else { // chan
+					// xdebugf(">>>>>> haslen = %v, make chan of type '%v' with length: %v", hasLen, ti.rt, rvlen)
 					rv = reflect.MakeChan(ti.rt, rvlen)
 					rv = reflect.MakeChan(ti.rt, rvlen)
 					rvChanged = true
 					rvChanged = true
 				}
 				}
@@ -1385,6 +1392,7 @@ func (d *Decoder) kSlice(f *codecFnInfo, rv reflect.Value) {
 				fn = d.cf.get(rtelem, true, true)
 				fn = d.cf.get(rtelem, true, true)
 			}
 			}
 			d.decodeValue(rv9, fn, true)
 			d.decodeValue(rv9, fn, true)
+			// xdebugf(">>>> rv9 sent on %v during decode: %v, with len=%v, cap=%v", rv.Type(), rv9, rv.Len(), rv.Cap())
 			rv.Send(rv9)
 			rv.Send(rv9)
 		} else {
 		} else {
 			// if indefinite, etc, then expand the slice if necessary
 			// if indefinite, etc, then expand the slice if necessary
@@ -1800,8 +1808,6 @@ type decReaderSwitch struct {
 // 	return z.ri.readUntil(in, stop)
 // 	return z.ri.readUntil(in, stop)
 // }
 // }
 
 
-const decScratchByteArrayLen = cacheLineSize - 8
-
 // A Decoder reads and decodes an object from an input stream in the codec format.
 // A Decoder reads and decodes an object from an input stream in the codec format.
 type Decoder struct {
 type Decoder struct {
 	panicHdl
 	panicHdl

+ 80 - 14
codec/encode.go

@@ -103,7 +103,15 @@ type EncodeOptions struct {
 	// if > 0, we use a smart buffer internally for performance purposes.
 	// if > 0, we use a smart buffer internally for performance purposes.
 	WriterBufferSize int
 	WriterBufferSize int
 
 
-	// Encode a struct as an array, and not as a map
+	// ChanRecvTimeout is the timeout used when selecting from a chan.
+	//
+	// Configuring this controls how we receive from a chan during the encoding process.
+	//   - If ==0, we only consume the elements currently available in the chan.
+	//   - if  <0, we consume until the chan is closed.
+	//   - If  >0, we consume until this timeout.
+	ChanRecvTimeout time.Duration
+
+	// StructToArray specifies to encode a struct as an array, and not as a map
 	StructToArray bool
 	StructToArray bool
 
 
 	// Canonical representation means that encoding a value will always result in the same
 	// Canonical representation means that encoding a value will always result in the same
@@ -317,15 +325,16 @@ func (e *Encoder) kSlice(f *codecFnInfo, rv reflect.Value) {
 		e.errorf("send-only channel cannot be encoded")
 		e.errorf("send-only channel cannot be encoded")
 	}
 	}
 	elemsep := e.esep
 	elemsep := e.esep
-	l := rv.Len()
 	rtelem := ti.elem
 	rtelem := ti.elem
 	rtelemIsByte := uint8TypId == rt2id(rtelem) // NOT rtelem.Kind() == reflect.Uint8
 	rtelemIsByte := uint8TypId == rt2id(rtelem) // NOT rtelem.Kind() == reflect.Uint8
+	var l int
 	// if a slice, array or chan of bytes, treat specially
 	// if a slice, array or chan of bytes, treat specially
 	if rtelemIsByte {
 	if rtelemIsByte {
 		switch f.seq {
 		switch f.seq {
 		case seqTypeSlice:
 		case seqTypeSlice:
 			ee.EncodeStringBytes(cRAW, rv.Bytes())
 			ee.EncodeStringBytes(cRAW, rv.Bytes())
 		case seqTypeArray:
 		case seqTypeArray:
+			l = rv.Len()
 			if rv.CanAddr() {
 			if rv.CanAddr() {
 				ee.EncodeStringBytes(cRAW, rv.Slice(0, l).Bytes())
 				ee.EncodeStringBytes(cRAW, rv.Slice(0, l).Bytes())
 			} else {
 			} else {
@@ -339,24 +348,89 @@ func (e *Encoder) kSlice(f *codecFnInfo, rv reflect.Value) {
 				ee.EncodeStringBytes(cRAW, bs)
 				ee.EncodeStringBytes(cRAW, bs)
 			}
 			}
 		case seqTypeChan:
 		case seqTypeChan:
-			bs := e.b[:0]
 			// do not use range, so that the number of elements encoded
 			// do not use range, so that the number of elements encoded
 			// does not change, and encoding does not hang waiting on someone to close chan.
 			// does not change, and encoding does not hang waiting on someone to close chan.
 			// for b := range rv2i(rv).(<-chan byte) { bs = append(bs, b) }
 			// for b := range rv2i(rv).(<-chan byte) { bs = append(bs, b) }
 			// ch := rv2i(rv).(<-chan byte) // fix error - that this is a chan byte, not a <-chan byte.
 			// ch := rv2i(rv).(<-chan byte) // fix error - that this is a chan byte, not a <-chan byte.
+
+			if rv.IsNil() {
+				ee.EncodeNil()
+				break
+			}
+			bs := e.b[:0]
 			irv := rv2i(rv)
 			irv := rv2i(rv)
 			ch, ok := irv.(<-chan byte)
 			ch, ok := irv.(<-chan byte)
 			if !ok {
 			if !ok {
 				ch = irv.(chan byte)
 				ch = irv.(chan byte)
 			}
 			}
-			for i := 0; i < l; i++ {
-				bs = append(bs, <-ch)
+
+		L1:
+			switch timeout := e.h.ChanRecvTimeout; {
+			case timeout == 0: // only consume available
+				for {
+					select {
+					case b := <-ch:
+						bs = append(bs, b)
+					default:
+						break L1
+					}
+				}
+			case timeout > 0: // consume until timeout
+				tt := time.NewTimer(timeout)
+				for {
+					select {
+					case b := <-ch:
+						bs = append(bs, b)
+					case <-tt.C:
+						// close(tt.C)
+						break L1
+					}
+				}
+			default: // consume until close
+				for b := range ch {
+					bs = append(bs, b)
+				}
 			}
 			}
+
 			ee.EncodeStringBytes(cRAW, bs)
 			ee.EncodeStringBytes(cRAW, bs)
 		}
 		}
 		return
 		return
 	}
 	}
 
 
+	// if chan, consume chan into a slice, and work off that slice.
+	var rvcs reflect.Value
+	if f.seq == seqTypeChan {
+		rvcs = reflect.Zero(reflect.SliceOf(rtelem))
+		timeout := e.h.ChanRecvTimeout
+		if timeout < 0 { // consume until close
+			for {
+				recv, recvOk := rv.Recv()
+				if !recvOk {
+					break
+				}
+				rvcs = reflect.Append(rvcs, recv)
+			}
+		} else {
+			cases := make([]reflect.SelectCase, 2)
+			cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: rv}
+			if timeout == 0 {
+				cases[1] = reflect.SelectCase{Dir: reflect.SelectDefault}
+			} else {
+				tt := time.NewTimer(timeout)
+				cases[1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tt.C)}
+			}
+			for {
+				chosen, recv, recvOk := reflect.Select(cases)
+				if chosen == 1 || !recvOk {
+					break
+				}
+				rvcs = reflect.Append(rvcs, recv)
+			}
+		}
+		rv = rvcs // TODO: ensure this doesn't mess up anywhere that rv of kind chan is expected
+	}
+
+	l = rv.Len()
 	if ti.mbs {
 	if ti.mbs {
 		if l%2 == 1 {
 		if l%2 == 1 {
 			e.errorf("mapBySlice requires even slice length, but got %v", l)
 			e.errorf("mapBySlice requires even slice length, but got %v", l)
@@ -390,15 +464,7 @@ func (e *Encoder) kSlice(f *codecFnInfo, rv reflect.Value) {
 					ee.WriteArrayElem()
 					ee.WriteArrayElem()
 				}
 				}
 			}
 			}
-			if f.seq == seqTypeChan {
-				if rv2, ok2 := rv.Recv(); ok2 {
-					e.encodeValue(rv2, fn, true)
-				} else {
-					ee.EncodeNil() // WE HAVE TO DO SOMETHING, so nil if nothing received.
-				}
-			} else {
-				e.encodeValue(rv.Index(j), fn, true)
-			}
+			e.encodeValue(rv.Index(j), fn, true)
 		}
 		}
 	}
 	}
 
 

+ 12 - 11
codec/gen-dec-array.go.tmpl

@@ -9,13 +9,14 @@ if {{var "l"}} == 0 {
 	} else if len({{var "v"}}) != 0 {
 	} else if len({{var "v"}}) != 0 {
 		{{var "v"}} = {{var "v"}}[:0]
 		{{var "v"}} = {{var "v"}}[:0]
 		{{var "c"}} = true
 		{{var "c"}} = true
-	} {{end}} {{if isChan }}if {{var "v"}} == nil {
+	} {{else if isChan }}if {{var "v"}} == nil {
 		{{var "v"}} = make({{ .CTyp }}, 0)
 		{{var "v"}} = make({{ .CTyp }}, 0)
 		{{var "c"}} = true
 		{{var "c"}} = true
 	} {{end}}
 	} {{end}}
 } else {
 } else {
 	{{var "hl"}} := {{var "l"}} > 0
 	{{var "hl"}} := {{var "l"}} > 0
-	var {{var "rl"}} int; _ =  {{var "rl"}}
+	var {{var "rl"}} int
+	_ =  {{var "rl"}}
 	{{if isSlice }} if {{var "hl"}} {
 	{{if isSlice }} if {{var "hl"}} {
 	if {{var "l"}} > cap({{var "v"}}) {
 	if {{var "l"}} > cap({{var "v"}}) {
 		{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
 		{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
@@ -33,25 +34,26 @@ if {{var "l"}} == 0 {
 	var {{var "j"}} int 
 	var {{var "j"}} int 
     // var {{var "dn"}} bool 
     // var {{var "dn"}} bool 
 	for ; ({{var "hl"}} && {{var "j"}} < {{var "l"}}) || !({{var "hl"}} || r.CheckBreak()); {{var "j"}}++ {
 	for ; ({{var "hl"}} && {{var "j"}} < {{var "l"}}) || !({{var "hl"}} || r.CheckBreak()); {{var "j"}}++ {
-		{{if not isArray}} if {{var "j"}} == 0 && len({{var "v"}}) == 0 {
+		{{if not isArray}} if {{var "j"}} == 0 && {{var "v"}} == nil {
 			if {{var "hl"}} {
 			if {{var "hl"}} {
 				{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
 				{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
 			} else {
 			} else {
-				{{var "rl"}} = 8
+				{{var "rl"}} = {{if isSlice}}8{{else if isChan}}64{{end}}
 			}
 			}
-			{{var "v"}} = make([]{{ .Typ }}, {{var "rl"}})
+			{{var "v"}} = make({{if isSlice}}[]{{ .Typ }}{{else if isChan}}{{.CTyp}}{{end}}, {{var "rl"}})
 			{{var "c"}} = true 
 			{{var "c"}} = true 
 		}{{end}}
 		}{{end}}
 		{{var "h"}}.ElemContainerState({{var "j"}})
 		{{var "h"}}.ElemContainerState({{var "j"}})
-        {{/* {{var "dn"}} = r.TryDecodeAsNil() */}}
-        {{if isChan}}{{ $x := printf "%[1]vv%[2]v" .TempVar .Rand }}var {{var $x}} {{ .Typ }}
+        {{/* {{var "dn"}} = r.TryDecodeAsNil() */}}{{/* commented out, as decLineVar handles this already each time */}}
+        {{if isChan}}{{ $x := printf "%[1]vvcx%[2]v" .TempVar .Rand }}var {{$x}} {{ .Typ }}
 		{{ decLineVar $x }}
 		{{ decLineVar $x }}
 		{{var "v"}} <- {{ $x }}
 		{{var "v"}} <- {{ $x }}
-        {{else}}
-		// if indefinite, etc, then expand the slice if necessary
+        // println(">>>> sending ", {{ $x }}, " into ", {{var "v"}}) // TODO: remove this
+        {{else}}{{/* // if indefinite, etc, then expand the slice if necessary */}}
 		var {{var "db"}} bool
 		var {{var "db"}} bool
 		if {{var "j"}} >= len({{var "v"}}) {
 		if {{var "j"}} >= len({{var "v"}}) {
-			{{if isSlice }} {{var "v"}} = append({{var "v"}}, {{ zero }}); {{var "c"}} = true
+			{{if isSlice }} {{var "v"}} = append({{var "v"}}, {{ zero }})
+			{{var "c"}} = true
 			{{else}} z.DecArrayCannotExpand(len(v), {{var "j"}}+1); {{var "db"}} = true
 			{{else}} z.DecArrayCannotExpand(len(v), {{var "j"}}+1); {{var "db"}} = true
 			{{end}}
 			{{end}}
 		}
 		}
@@ -74,4 +76,3 @@ if {{var "l"}} == 0 {
 {{if not isArray }}if {{var "c"}} { 
 {{if not isArray }}if {{var "c"}} { 
 	*{{ .Varname }} = {{var "v"}}
 	*{{ .Varname }} = {{var "v"}}
 }{{end}}
 }{{end}}
-

+ 27 - 0
codec/gen-enc-chan.go.tmpl

@@ -0,0 +1,27 @@
+{{.Label}}:
+switch timeout{{.Sfx}} :=  z.EncBasicHandle().ChanRecvTimeout; {
+case timeout{{.Sfx}} == 0: // only consume available
+	for {
+		select {
+		case b{{.Sfx}} := <-{{.Chan}}:
+			{{ .Slice }} = append({{.Slice}}, b{{.Sfx}})
+		default:
+			break {{.Label}}
+		}
+	}
+case timeout{{.Sfx}} > 0: // consume until timeout
+	tt{{.Sfx}} := time.NewTimer(timeout{{.Sfx}})
+	for {
+		select {
+		case b{{.Sfx}} := <-{{.Chan}}:
+			{{.Slice}} = append({{.Slice}}, b{{.Sfx}})
+		case <-tt{{.Sfx}}.C:
+			// close(tt.C)
+			break {{.Label}}
+		}
+	}
+default: // consume until close
+	for b{{.Sfx}} := range {{.Chan}} {
+		{{.Slice}} = append({{.Slice}}, b{{.Sfx}})
+	}
+}

+ 41 - 10
codec/gen.generated.go

@@ -64,13 +64,14 @@ if {{var "l"}} == 0 {
 	} else if len({{var "v"}}) != 0 {
 	} else if len({{var "v"}}) != 0 {
 		{{var "v"}} = {{var "v"}}[:0]
 		{{var "v"}} = {{var "v"}}[:0]
 		{{var "c"}} = true
 		{{var "c"}} = true
-	} {{end}} {{if isChan }}if {{var "v"}} == nil {
+	} {{else if isChan }}if {{var "v"}} == nil {
 		{{var "v"}} = make({{ .CTyp }}, 0)
 		{{var "v"}} = make({{ .CTyp }}, 0)
 		{{var "c"}} = true
 		{{var "c"}} = true
 	} {{end}}
 	} {{end}}
 } else {
 } else {
 	{{var "hl"}} := {{var "l"}} > 0
 	{{var "hl"}} := {{var "l"}} > 0
-	var {{var "rl"}} int; _ =  {{var "rl"}}
+	var {{var "rl"}} int
+	_ =  {{var "rl"}}
 	{{if isSlice }} if {{var "hl"}} {
 	{{if isSlice }} if {{var "hl"}} {
 	if {{var "l"}} > cap({{var "v"}}) {
 	if {{var "l"}} > cap({{var "v"}}) {
 		{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
 		{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
@@ -88,25 +89,26 @@ if {{var "l"}} == 0 {
 	var {{var "j"}} int 
 	var {{var "j"}} int 
     // var {{var "dn"}} bool 
     // var {{var "dn"}} bool 
 	for ; ({{var "hl"}} && {{var "j"}} < {{var "l"}}) || !({{var "hl"}} || r.CheckBreak()); {{var "j"}}++ {
 	for ; ({{var "hl"}} && {{var "j"}} < {{var "l"}}) || !({{var "hl"}} || r.CheckBreak()); {{var "j"}}++ {
-		{{if not isArray}} if {{var "j"}} == 0 && len({{var "v"}}) == 0 {
+		{{if not isArray}} if {{var "j"}} == 0 && {{var "v"}} == nil {
 			if {{var "hl"}} {
 			if {{var "hl"}} {
 				{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
 				{{var "rl"}} = z.DecInferLen({{var "l"}}, z.DecBasicHandle().MaxInitLen, {{ .Size }})
 			} else {
 			} else {
-				{{var "rl"}} = 8
+				{{var "rl"}} = {{if isSlice}}8{{else if isChan}}64{{end}}
 			}
 			}
-			{{var "v"}} = make([]{{ .Typ }}, {{var "rl"}})
+			{{var "v"}} = make({{if isSlice}}[]{{ .Typ }}{{else if isChan}}{{.CTyp}}{{end}}, {{var "rl"}})
 			{{var "c"}} = true 
 			{{var "c"}} = true 
 		}{{end}}
 		}{{end}}
 		{{var "h"}}.ElemContainerState({{var "j"}})
 		{{var "h"}}.ElemContainerState({{var "j"}})
-        {{/* {{var "dn"}} = r.TryDecodeAsNil() */}}
-        {{if isChan}}{{ $x := printf "%[1]vv%[2]v" .TempVar .Rand }}var {{var $x}} {{ .Typ }}
+        {{/* {{var "dn"}} = r.TryDecodeAsNil() */}}{{/* commented out, as decLineVar handles this already each time */}}
+        {{if isChan}}{{ $x := printf "%[1]vvcx%[2]v" .TempVar .Rand }}var {{$x}} {{ .Typ }}
 		{{ decLineVar $x }}
 		{{ decLineVar $x }}
 		{{var "v"}} <- {{ $x }}
 		{{var "v"}} <- {{ $x }}
-        {{else}}
-		// if indefinite, etc, then expand the slice if necessary
+        // println(">>>> sending ", {{ $x }}, " into ", {{var "v"}}) // TODO: remove this
+        {{else}}{{/* // if indefinite, etc, then expand the slice if necessary */}}
 		var {{var "db"}} bool
 		var {{var "db"}} bool
 		if {{var "j"}} >= len({{var "v"}}) {
 		if {{var "j"}} >= len({{var "v"}}) {
-			{{if isSlice }} {{var "v"}} = append({{var "v"}}, {{ zero }}); {{var "c"}} = true
+			{{if isSlice }} {{var "v"}} = append({{var "v"}}, {{ zero }})
+			{{var "c"}} = true
 			{{else}} z.DecArrayCannotExpand(len(v), {{var "j"}}+1); {{var "db"}} = true
 			{{else}} z.DecArrayCannotExpand(len(v), {{var "j"}}+1); {{var "db"}} = true
 			{{end}}
 			{{end}}
 		}
 		}
@@ -129,5 +131,34 @@ if {{var "l"}} == 0 {
 {{if not isArray }}if {{var "c"}} { 
 {{if not isArray }}if {{var "c"}} { 
 	*{{ .Varname }} = {{var "v"}}
 	*{{ .Varname }} = {{var "v"}}
 }{{end}}
 }{{end}}
+`
 
 
+const genEncChanTmpl = `
+{{.Label}}:
+switch timeout{{.Sfx}} :=  z.EncBasicHandle().ChanRecvTimeout; {
+case timeout{{.Sfx}} == 0: // only consume available
+	for {
+		select {
+		case b{{.Sfx}} := <-{{.Chan}}:
+			{{ .Slice }} = append({{.Slice}}, b{{.Sfx}})
+		default:
+			break {{.Label}}
+		}
+	}
+case timeout{{.Sfx}} > 0: // consume until timeout
+	tt{{.Sfx}} := time.NewTimer(timeout{{.Sfx}})
+	for {
+		select {
+		case b{{.Sfx}} := <-{{.Chan}}:
+			{{.Slice}} = append({{.Slice}}, b{{.Sfx}})
+		case <-tt{{.Sfx}}.C:
+			// close(tt.C)
+			break {{.Label}}
+		}
+	}
+default: // consume until close
+	for b{{.Sfx}} := range {{.Chan}} {
+		{{.Slice}} = append({{.Slice}}, b{{.Sfx}})
+	}
+}
 `
 `

+ 30 - 9
codec/gen.go

@@ -1083,28 +1083,49 @@ func (x *genRunner) encStruct(varname string, rtid uintptr, t reflect.Type) {
 }
 }
 
 
 func (x *genRunner) encListFallback(varname string, t reflect.Type) {
 func (x *genRunner) encListFallback(varname string, t reflect.Type) {
+	elemBytes := t.Elem().Kind() == reflect.Uint8
 	if t.AssignableTo(uint8SliceTyp) {
 	if t.AssignableTo(uint8SliceTyp) {
 		x.linef("r.EncodeStringBytes(codecSelferCcRAW%s, []byte(%s))", x.xs, varname)
 		x.linef("r.EncodeStringBytes(codecSelferCcRAW%s, []byte(%s))", x.xs, varname)
 		return
 		return
 	}
 	}
-	if t.Kind() == reflect.Array && t.Elem().Kind() == reflect.Uint8 {
+	if t.Kind() == reflect.Array && elemBytes {
 		x.linef("r.EncodeStringBytes(codecSelferCcRAW%s, ((*[%d]byte)(%s))[:])", x.xs, t.Len(), varname)
 		x.linef("r.EncodeStringBytes(codecSelferCcRAW%s, ((*[%d]byte)(%s))[:])", x.xs, t.Len(), varname)
 		return
 		return
 	}
 	}
 	i := x.varsfx()
 	i := x.varsfx()
-	g := genTempVarPfx
-	x.line("r.WriteArrayStart(len(" + varname + "))")
 	if t.Kind() == reflect.Chan {
 	if t.Kind() == reflect.Chan {
-		x.linef("for %si%s, %si2%s := 0, len(%s); %si%s < %si2%s; %si%s++ {", g, i, g, i, varname, g, i, g, i, g, i)
-		x.line("r.WriteArrayElem()")
-		x.linef("%sv%s := <-%s", g, i, varname)
-	} else {
-		x.linef("for _, %sv%s := range %s {", genTempVarPfx, i, varname)
-		x.line("r.WriteArrayElem()")
+		type ts struct {
+			Label, Chan, Slice, Sfx string
+		}
+		tm, err := template.New("").Parse(genEncChanTmpl)
+		if err != nil {
+			panic(err)
+		}
+		x.linef("if %s == nil { r.EncodeNil() } else { ", varname)
+		x.linef("var sch%s []%s", i, x.genTypeName(t.Elem()))
+		err = tm.Execute(x.w, &ts{"Lsch" + i, varname, "sch" + i, i})
+		if err != nil {
+			panic(err)
+		}
+		// x.linef("%s = sch%s", varname, i)
+		if elemBytes {
+			x.linef("r.EncodeStringBytes(codecSelferCcRAW%s, []byte(%s))", x.xs, "sch"+i)
+			x.line("}")
+			return
+		}
+		varname = "sch" + i
 	}
 	}
+
+	x.line("r.WriteArrayStart(len(" + varname + "))")
+	x.linef("for _, %sv%s := range %s {", genTempVarPfx, i, varname)
+	x.line("r.WriteArrayElem()")
+
 	x.encVar(genTempVarPfx+"v"+i, t.Elem())
 	x.encVar(genTempVarPfx+"v"+i, t.Elem())
 	x.line("}")
 	x.line("}")
 	x.line("r.WriteArrayEnd()")
 	x.line("r.WriteArrayEnd()")
+	if t.Kind() == reflect.Chan {
+		x.line("}")
+	}
 }
 }
 
 
 func (x *genRunner) encMapFallback(varname string, t reflect.Type) {
 func (x *genRunner) encMapFallback(varname string, t reflect.Type) {

+ 1 - 1
codec/json.go

@@ -1268,7 +1268,7 @@ type JsonHandle struct {
 	// If not configured, raw bytes are encoded to/from base64 text.
 	// If not configured, raw bytes are encoded to/from base64 text.
 	RawBytesExt InterfaceExt
 	RawBytesExt InterfaceExt
 
 
-	_ [3]uint64 // padding
+	_ [2]uint64 // padding
 }
 }
 
 
 // Name returns the name of the handle: json
 // Name returns the name of the handle: json

+ 1 - 5
codec/mammoth2_codecgen_generated_test.go

@@ -38224,7 +38224,6 @@ func (x codecSelfer19781) dectestMammoth2Basic(v *testMammoth2Basic, d *Decoder)
 
 
 			yyh1.ElemContainerState(yyj1)
 			yyh1.ElemContainerState(yyj1)
 
 
-			// if indefinite, etc, then expand the slice if necessary
 			var yydb1 bool
 			var yydb1 bool
 			if yyj1 >= len(yyv1) {
 			if yyj1 >= len(yyv1) {
 				z.DecArrayCannotExpand(len(v), yyj1+1)
 				z.DecArrayCannotExpand(len(v), yyj1+1)
@@ -38372,7 +38371,7 @@ func (x codecSelfer19781) decSliceTestMammoth2(v *[]TestMammoth2, d *Decoder) {
 		var yyj1 int
 		var yyj1 int
 		// var yydn1 bool
 		// var yydn1 bool
 		for ; (yyhl1 && yyj1 < yyl1) || !(yyhl1 || r.CheckBreak()); yyj1++ {
 		for ; (yyhl1 && yyj1 < yyl1) || !(yyhl1 || r.CheckBreak()); yyj1++ {
-			if yyj1 == 0 && len(yyv1) == 0 {
+			if yyj1 == 0 && yyv1 == nil {
 				if yyhl1 {
 				if yyhl1 {
 					yyrl1 = z.DecInferLen(yyl1, z.DecBasicHandle().MaxInitLen, 4880)
 					yyrl1 = z.DecInferLen(yyl1, z.DecBasicHandle().MaxInitLen, 4880)
 				} else {
 				} else {
@@ -38383,7 +38382,6 @@ func (x codecSelfer19781) decSliceTestMammoth2(v *[]TestMammoth2, d *Decoder) {
 			}
 			}
 			yyh1.ElemContainerState(yyj1)
 			yyh1.ElemContainerState(yyj1)
 
 
-			// if indefinite, etc, then expand the slice if necessary
 			var yydb1 bool
 			var yydb1 bool
 			if yyj1 >= len(yyv1) {
 			if yyj1 >= len(yyv1) {
 				yyv1 = append(yyv1, TestMammoth2{})
 				yyv1 = append(yyv1, TestMammoth2{})
@@ -38414,7 +38412,6 @@ func (x codecSelfer19781) decSliceTestMammoth2(v *[]TestMammoth2, d *Decoder) {
 	if yyc1 {
 	if yyc1 {
 		*v = yyv1
 		*v = yyv1
 	}
 	}
-
 }
 }
 
 
 func (x codecSelfer19781) encArray4int64(v *[4]int64, e *Encoder) {
 func (x codecSelfer19781) encArray4int64(v *[4]int64, e *Encoder) {
@@ -38452,7 +38449,6 @@ func (x codecSelfer19781) decArray4int64(v *[4]int64, d *Decoder) {
 
 
 			yyh1.ElemContainerState(yyj1)
 			yyh1.ElemContainerState(yyj1)
 
 
-			// if indefinite, etc, then expand the slice if necessary
 			var yydb1 bool
 			var yydb1 bool
 			if yyj1 >= len(yyv1) {
 			if yyj1 >= len(yyv1) {
 				z.DecArrayCannotExpand(len(v), yyj1+1)
 				z.DecArrayCannotExpand(len(v), yyj1+1)

+ 1 - 1
codec/msgpack.go

@@ -945,7 +945,7 @@ type MsgpackHandle struct {
 	binaryEncodingType
 	binaryEncodingType
 	noElemSeparators
 	noElemSeparators
 
 
-	_ [1]uint64 // padding
+	// _ [1]uint64 // padding
 }
 }
 
 
 // Name returns the name of the handle: msgpack
 // Name returns the name of the handle: msgpack

+ 1 - 1
codec/simple.go

@@ -616,7 +616,7 @@ type SimpleHandle struct {
 	// EncZeroValuesAsNil says to encode zero values for numbers, bool, string, etc as nil
 	// EncZeroValuesAsNil says to encode zero values for numbers, bool, string, etc as nil
 	EncZeroValuesAsNil bool
 	EncZeroValuesAsNil bool
 
 
-	_ [1]uint64 // padding
+	// _ [1]uint64 // padding
 }
 }
 
 
 // Name returns the name of the handle: simple
 // Name returns the name of the handle: simple

+ 26 - 1
codec/values_flex_test.go

@@ -5,7 +5,12 @@
 
 
 package codec
 package codec
 
 
-import "time"
+import (
+	"strings"
+	"time"
+)
+
+const teststrucflexChanCap = 64
 
 
 // This file contains values used by tests alone.
 // This file contains values used by tests alone.
 // This is where we may try out different things,
 // This is where we may try out different things,
@@ -75,6 +80,8 @@ type TestStrucFlex struct {
 	_struct struct{} `codec:",omitempty"` //set omitempty for every field
 	_struct struct{} `codec:",omitempty"` //set omitempty for every field
 	TestStrucCommon
 	TestStrucCommon
 
 
+	Chstr chan string
+
 	Mis     map[int]string
 	Mis     map[int]string
 	Mbu64   map[bool]struct{}
 	Mbu64   map[bool]struct{}
 	Miwu64s map[int]wrapUint64Slice
 	Miwu64s map[int]wrapUint64Slice
@@ -104,8 +111,21 @@ type TestStrucFlex struct {
 	Nteststruc *TestStrucFlex
 	Nteststruc *TestStrucFlex
 }
 }
 
 
+func emptyTestStrucFlex() *TestStrucFlex {
+	var ts TestStrucFlex
+	// we initialize and start draining the chan, so that we can decode into it without it blocking due to no consumer
+	ts.Chstr = make(chan string, teststrucflexChanCap)
+	go func() {
+		for range ts.Chstr {
+		}
+	}() // drain it
+	return &ts
+}
+
 func newTestStrucFlex(depth, n int, bench, useInterface, useStringKeyOnly bool) (ts *TestStrucFlex) {
 func newTestStrucFlex(depth, n int, bench, useInterface, useStringKeyOnly bool) (ts *TestStrucFlex) {
 	ts = &TestStrucFlex{
 	ts = &TestStrucFlex{
+		Chstr: make(chan string, teststrucflexChanCap),
+
 		Miwu64s: map[int]wrapUint64Slice{
 		Miwu64s: map[int]wrapUint64Slice{
 			5: []wrapUint64{1, 2, 3, 4, 5},
 			5: []wrapUint64{1, 2, 3, 4, 5},
 			3: []wrapUint64{1, 2, 3},
 			3: []wrapUint64{1, 2, 3},
@@ -152,6 +172,11 @@ func newTestStrucFlex(depth, n int, bench, useInterface, useStringKeyOnly bool)
 		ArrStrUi64T: [4]stringUint64T{{"4", 4}, {"3", 3}, {"2", 2}, {"1", 1}},
 		ArrStrUi64T: [4]stringUint64T{{"4", 4}, {"3", 3}, {"2", 2}, {"1", 1}},
 	}
 	}
 
 
+	numChanSend := cap(ts.Chstr) / 4 // 8
+	for i := 0; i < numChanSend; i++ {
+		ts.Chstr <- strings.Repeat("A", i+1)
+	}
+
 	ts.Ui64slicearray = []*[4]uint64{&ts.Ui64array, &ts.Ui64array}
 	ts.Ui64slicearray = []*[4]uint64{&ts.Ui64array, &ts.Ui64array}
 
 
 	if useInterface {
 	if useInterface {