浏览代码

conn: lazily allocate stream call

Use sync.Pool to recycle inflight call requests.
Use a map to store inflight requests
Chris Bannister 10 年之前
父节点
当前提交
83d38bd442
共有 1 个文件被更改,包括 52 次插入18 次删除
  1. 52 18
      conn.go

+ 52 - 18
conn.go

@@ -112,15 +112,17 @@ var TimeoutLimit int64 = 10
 // queries, but users are usually advised to use a more reliable, higher
 // level API.
 type Conn struct {
-	conn    net.Conn
-	r       *bufio.Reader
-	timeout time.Duration
-	cfg     *ConnConfig
+	conn       net.Conn
+	r          *bufio.Reader
+	timeout    time.Duration
+	cfg        *ConnConfig
+	numStreams int
 
 	headerBuf []byte
 
 	uniq  chan int
-	calls []callReq
+	mu    sync.RWMutex
+	calls map[int]*callReq
 
 	errorHandler    ConnErrorHandler
 	compressor      Compressor
@@ -188,7 +190,7 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
 		r:            bufio.NewReader(conn),
 		cfg:          cfg,
 		uniq:         make(chan int, streams),
-		calls:        make([]callReq, streams),
+		calls:        make(map[int]*callReq),
 		timeout:      cfg.Timeout,
 		version:      uint8(cfg.ProtoVersion),
 		addr:         conn.RemoteAddr().String(),
@@ -198,6 +200,7 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
 		headerBuf:    make([]byte, headerSize),
 		quit:         make(chan struct{}),
 		session:      session,
+		numStreams:   streams,
 	}
 
 	if cfg.Keepalive > 0 {
@@ -207,7 +210,6 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
 	// reserve stream 0 incase cassandra returns an error on it without us sending
 	// a request.
 	for i := 1; i < streams; i++ {
-		c.calls[i].resp = make(chan error)
 		c.uniq <- i
 	}
 
@@ -340,8 +342,8 @@ func (c *Conn) closeWithError(err error) {
 	if err != nil {
 		// we should attempt to deliver the error back to the caller if it
 		// exists
-		for id := 0; id < len(c.calls); id++ {
-			req := &c.calls[id]
+		c.mu.RLock()
+		for _, req := range c.calls {
 			// we need to send the error to all waiting queries, put the state
 			// of this conn into not active so that it can not execute any queries.
 			if err != nil {
@@ -351,6 +353,7 @@ func (c *Conn) closeWithError(err error) {
 				}
 			}
 		}
+		c.mu.RUnlock()
 	}
 
 	// if error was nil then unblock the quit channel
@@ -407,7 +410,7 @@ func (c *Conn) recv() error {
 		return err
 	}
 
-	if head.stream > len(c.calls) {
+	if head.stream > c.numStreams {
 		return fmt.Errorf("gocql: frame header stream is beyond call exepected bounds: %d", head.stream)
 	} else if head.stream == -1 {
 		// TODO: handle cassandra event frames, we shouldnt get any currently
@@ -434,8 +437,10 @@ func (c *Conn) recv() error {
 		}
 	}
 
-	call := &c.calls[head.stream]
-	if call == nil || call.framer == nil {
+	c.mu.RLock()
+	call, ok := c.calls[head.stream]
+	c.mu.RUnlock()
+	if call == nil || call.framer == nil || !ok {
 		log.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
 		return c.discardFrame(head)
 	}
@@ -463,14 +468,22 @@ func (c *Conn) recv() error {
 
 type callReq struct {
 	// could use a waitgroup but this allows us to do timeouts on the read/send
-	resp    chan error
-	framer  *framer
-	timeout chan struct{} // indicates to recv() that a call has timedout
+	resp     chan error
+	framer   *framer
+	timeout  chan struct{} // indicates to recv() that a call has timedout
+	streamID int           // current stream in use
 }
 
 func (c *Conn) releaseStream(stream int) {
-	call := &c.calls[stream]
-	call.framer = nil
+	c.mu.Lock()
+	call := c.calls[stream]
+	if call != nil && stream != call.streamID {
+		panic(fmt.Sprintf("attempt to release streamID with ivalid stream: %d -> %+v\n", stream, call))
+	}
+	delete(c.calls, stream)
+	c.mu.Unlock()
+
+	streamPool.Put(call)
 
 	select {
 	case c.uniq <- stream:
@@ -484,6 +497,16 @@ func (c *Conn) handleTimeout() {
 	}
 }
 
+var (
+	streamPool = sync.Pool{
+		New: func() interface{} {
+			return &callReq{
+				resp: make(chan error),
+			}
+		},
+	}
+)
+
 func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
 	// TODO: move tracer onto conn
 	var stream int
@@ -495,9 +518,20 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
 
 	// resp is basically a waiting semaphore protecting the framer
 	framer := newFramer(c, c, c.compressor, c.version)
-	call := &c.calls[stream]
+
+	c.mu.Lock()
+	call := c.calls[stream]
+	if call != nil {
+		panic(fmt.Sprintf("attempting to use stream already in use: %d -> %+v\n", stream, call))
+	} else {
+		call = streamPool.Get().(*callReq)
+	}
+	c.calls[stream] = call
+	c.mu.Unlock()
+
 	call.framer = framer
 	call.timeout = make(chan struct{})
+	call.streamID = stream
 
 	if tracer != nil {
 		framer.trace()