Browse Source

improve close handling for streams

Ensure that streams dont block forever if a connection is closed
or the request is timedout.
Chris Bannister 10 years ago
parent
commit
e77f3a3a12
1 changed files with 14 additions and 11 deletions
  1. 14 11
      conn.go

+ 14 - 11
conn.go

@@ -113,6 +113,7 @@ type Conn struct {
 	started         bool
 
 	closed int32
+	quit   chan struct{}
 
 	timeouts int64
 }
@@ -171,6 +172,7 @@ func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn,
 		compressor:   cfg.Compressor,
 		auth:         cfg.Authenticator,
 		headerBuf:    make([]byte, headerSize),
+		quit:         make(chan struct{}),
 	}
 
 	if cfg.Keepalive > 0 {
@@ -343,8 +345,8 @@ func (c *Conn) recv() error {
 
 	select {
 	case call.resp <- err:
-	default:
-		c.releaseStream(head.stream)
+	case <-c.quit:
+		return nil
 	}
 
 	return nil
@@ -376,7 +378,6 @@ func (c *Conn) handleTimeout() {
 
 func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
 	// TODO: move tracer onto conn
-	start := time.Now()
 	stream := <-c.uniq
 
 	call := &c.calls[stream]
@@ -388,7 +389,9 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
 		framer.trace()
 	}
 
-	atomic.StoreInt32(&call.waiting, 1)
+	if !atomic.CompareAndSwapInt32(&call.waiting, 0, 1) {
+		return nil, errors.New("gocql: stream is busy or closed")
+	}
 	defer atomic.StoreInt32(&call.waiting, 0)
 
 	err := req.writeFrame(framer, stream)
@@ -408,8 +411,9 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
 		}
 	case <-time.After(c.timeout):
 		c.handleTimeout()
-		log.Printf("querytime=%v\n", time.Now().Sub(start))
 		return nil, ErrTimeoutNoResponse
+	case <-c.quit:
+		return nil, ErrConnectionClosed
 	}
 
 	if v := framer.header.version.version(); v != c.version {
@@ -607,15 +611,13 @@ func (c *Conn) closeWithError(err error) {
 		return
 	}
 
+	close(c.quit)
+
 	for id := 0; id < len(c.calls); id++ {
 		req := &c.calls[id]
 		// we need to send the error to all waiting queries, put the state
 		// of this conn into not active so that it can not execute any queries.
-		atomic.StoreInt32(&req.waiting, 0)
-		select {
-		case req.resp <- err:
-		default:
-		}
+		atomic.StoreInt32(&req.waiting, -1)
 	}
 
 	c.conn.Close()
@@ -770,7 +772,8 @@ type inflightPrepare struct {
 }
 
 var (
-	ErrQueryArgLength    = errors.New("query argument length mismatch")
+	ErrQueryArgLength    = errors.New("gocql: query argument length mismatch")
 	ErrTimeoutNoResponse = errors.New("gocql: no response recieved from cassandra within timeout period")
 	ErrTooManyTimeouts   = errors.New("gocql: too many query timeouts on the connection")
+	ErrConnectionClosed  = errors.New("gocql: connection closed waiting for response")
 )