Переглянути джерело

codec: rpc: use lock free mechanisms to track the close status.

Rpc needs to check whether the connection is closed everytime a decode is to be done.

However, the closing can happen in a different goroutine.

Previously, we used a mutex to write (Lock/Unlock) and read (RLock, RUnlock).

However, this can be expensive, since the writing happens at most once, but the
reading happens everytime.

We now make this more performant, by using lock-free mechanisms
(atomic.Value and atomic.Load/StorePointer).

This may improve some scalability and stress scenarios.

Fixes #271
Ugorji Nwoke 7 роки тому
батько
коміт
ee2199668c
5 змінених файлів з 62 додано та 25 видалено
  1. 5 0
      codec/helper.go
  2. 17 0
      codec/helper_not_unsafe.go
  3. 15 0
      codec/helper_unsafe.go
  4. 1 1
      codec/msgpack.go
  5. 24 24
      codec/rpc.go

+ 5 - 0
codec/helper.go

@@ -158,6 +158,11 @@ func init() {
 	refBitset.set(byte(reflect.Chan))
 }
 
+type clsErr struct {
+	closed    bool  // is it closed?
+	errClosed error // error on closing
+}
+
 type charEncoding uint8
 
 const (

+ 17 - 0
codec/helper_not_unsafe.go

@@ -93,6 +93,23 @@ func isEmptyValue(v reflect.Value, tinfos *TypeInfos, deref, checkStruct bool) b
 // 	return reflect.ValueOf(i).Elem()
 // }
 
+// --------------------------
+type atomicClsErr struct {
+	v atomic.Value
+}
+
+func (x *atomicClsErr) load() clsErr {
+	i := x.v.Load()
+	if i == nil {
+		return clsErr{}
+	}
+	return i.(clsErr)
+}
+
+func (x *atomicClsErr) store(p clsErr) {
+	x.v.Store(p)
+}
+
 // --------------------------
 type atomicTypeInfoSlice struct { // expected to be 2 words
 	v atomic.Value

+ 15 - 0
codec/helper_unsafe.go

@@ -203,6 +203,21 @@ func (x *atomicTypeInfoSlice) store(p []rtid2ti) {
 	atomic.StorePointer(&xp, unsafe.Pointer(&atomicTypeInfoSlice{l: s.Len, v: s.Data}))
 }
 
+// --------------------------
+type atomicClsErr struct {
+	v clsErr
+}
+
+func (x *atomicClsErr) load() clsErr {
+	xp := unsafe.Pointer(&x.v)
+	return *(*clsErr)(atomic.LoadPointer(&xp))
+}
+
+func (x *atomicClsErr) store(p clsErr) {
+	xp := unsafe.Pointer(&x.v)
+	atomic.StorePointer(&xp, unsafe.Pointer(&p))
+}
+
 // --------------------------
 func (d *Decoder) raw(f *codecFnInfo, rv reflect.Value) {
 	urv := (*unsafeReflectValue)(unsafe.Pointer(&rv))

+ 1 - 1
codec/msgpack.go

@@ -1031,7 +1031,7 @@ func (c *msgpackSpecRpcCodec) ReadRequestBody(body interface{}) error {
 }
 
 func (c *msgpackSpecRpcCodec) parseCustomHeader(expectTypeByte byte, msgid *uint64, methodOrError *string) (err error) {
-	if c.isClosed() {
+	if cls := c.cls.load(); cls.closed {
 		return io.EOF
 	}
 

+ 24 - 24
codec/rpc.go

@@ -11,6 +11,8 @@ import (
 	"sync"
 )
 
+var errRpcJsonNeedsTermWhitespace = errors.New("rpc requires a JsonHandle with TermWhitespace set to true")
+
 // Rpc provides a rpc Server or Client Codec for rpc communication.
 type Rpc interface {
 	ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
@@ -41,9 +43,7 @@ type rpcCodec struct {
 	mu sync.Mutex
 	h  Handle
 
-	cls    bool
-	clsmu  sync.RWMutex
-	clsErr error
+	cls atomicClsErr
 }
 
 func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
@@ -54,7 +54,7 @@ func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
 func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
 	// defensive: ensure that jsonH has TermWhitespace turned on.
 	if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
-		panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
+		panic(errRpcJsonNeedsTermWhitespace)
 	}
 	// always ensure that we use a flusher, and always flush what was written to the connection.
 	// we lose nothing by using a buffered writer internally.
@@ -88,8 +88,11 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
 }
 
 func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
-	if c.isClosed() {
-		return c.clsErr
+	if c.c != nil {
+		cls := c.cls.load()
+		if cls.closed {
+			return cls.errClosed
+		}
 	}
 	err = c.enc.Encode(obj1)
 	if err == nil {
@@ -116,8 +119,11 @@ func (c *rpcCodec) swallow(err *error) {
 }
 
 func (c *rpcCodec) read(obj interface{}) (err error) {
-	if c.isClosed() {
-		return c.clsErr
+	if c.c != nil {
+		cls := c.cls.load()
+		if cls.closed {
+			return cls.errClosed
+		}
 	}
 	//If nil is passed in, we should read and discard
 	if obj == nil {
@@ -129,24 +135,18 @@ func (c *rpcCodec) read(obj interface{}) (err error) {
 	return c.dec.Decode(obj)
 }
 
-func (c *rpcCodec) isClosed() (b bool) {
-	if c.c != nil {
-		c.clsmu.RLock()
-		b = c.cls
-		c.clsmu.RUnlock()
-	}
-	return
-}
-
 func (c *rpcCodec) Close() error {
-	if c.c == nil || c.isClosed() {
-		return c.clsErr
+	if c.c == nil {
+		return nil
+	}
+	cls := c.cls.load()
+	if cls.closed {
+		return cls.errClosed
 	}
-	c.clsmu.Lock()
-	c.cls = true
-	c.clsErr = c.c.Close()
-	c.clsmu.Unlock()
-	return c.clsErr
+	cls.errClosed = c.c.Close()
+	cls.closed = true
+	c.cls.store(cls)
+	return cls.errClosed
 }
 
 func (c *rpcCodec) ReadResponseBody(body interface{}) error {