Browse Source

Revert "send frame should not block (#1062)"

This reverts commit acd6671d04577df9e6cd95fa1bd0aa70f98122af.
Chris Bannister 7 years ago
parent
commit
df37f92779
2 changed files with 57 additions and 108 deletions
  1. 1 0
      AUTHORS
  2. 56 108
      conn.go

+ 1 - 0
AUTHORS

@@ -104,3 +104,4 @@ Luke Hines <lukehines@protonmail.com>
 Zhixin Wen <john.wenzhixin@hotmail.com>
 Chang Liu <changliu.it@gmail.com>
 Ingo Oeser <nightlyone@gmail.com>
+Luke Hines <lukehines@protonmail.com>

+ 56 - 108
conn.go

@@ -147,8 +147,6 @@ type Conn struct {
 	quit   chan struct{}
 
 	timeouts int64
-
-	frameWriteArgChan chan *callReq
 }
 
 // Connect establishes a connection to a Cassandra node.
@@ -185,20 +183,19 @@ func (s *Session) dial(ip net.IP, port int, cfg *ConnConfig, errorHandler ConnEr
 	}
 
 	c := &Conn{
-		conn:              conn,
-		r:                 bufio.NewReader(conn),
-		cfg:               cfg,
-		calls:             make(map[int]*callReq),
-		timeout:           cfg.Timeout,
-		version:           uint8(cfg.ProtoVersion),
-		addr:              conn.RemoteAddr().String(),
-		errorHandler:      errorHandler,
-		compressor:        cfg.Compressor,
-		auth:              cfg.Authenticator,
-		quit:              make(chan struct{}),
-		session:           s,
-		streams:           streams.New(cfg.ProtoVersion),
-		frameWriteArgChan: make(chan *callReq),
+		conn:         conn,
+		r:            bufio.NewReader(conn),
+		cfg:          cfg,
+		calls:        make(map[int]*callReq),
+		timeout:      cfg.Timeout,
+		version:      uint8(cfg.ProtoVersion),
+		addr:         conn.RemoteAddr().String(),
+		errorHandler: errorHandler,
+		compressor:   cfg.Compressor,
+		auth:         cfg.Authenticator,
+		quit:         make(chan struct{}),
+		session:      s,
+		streams:      streams.New(cfg.ProtoVersion),
 	}
 
 	if cfg.Keepalive > 0 {
@@ -218,9 +215,6 @@ func (s *Session) dial(ip net.IP, port int, cfg *ConnConfig, errorHandler ConnEr
 
 	frameTicker := make(chan struct{}, 1)
 	startupErr := make(chan error)
-
-	go c.writeToConn()
-
 	go func() {
 		for range frameTicker {
 			err := c.recv()
@@ -442,27 +436,6 @@ func (c *Conn) discardFrame(head frameHeader) error {
 	return nil
 }
 
-// writeToConn() processes writing to the connection, which is required before any write
-// to Conn and is usually called in a separate goroutine.
-func (c *Conn) writeToConn() {
-	for {
-		select {
-		case call := <-c.frameWriteArgChan:
-			if err := call.req.writeFrame(call.framer, call.streamID); err != nil {
-				// I think this is the correct thing to do, im not entirely sure. It is not
-				// ideal as readers might still get some data, but they probably wont.
-				// Here we need to be careful as the stream is not available and if all
-				// writes just timeout or fail then the pool might use this connection to
-				// send a frame on, with all the streams used up and not returned.
-				c.closeWithError(err)
-				return
-			}
-		case <-c.quit:
-			return
-		}
-	}
-}
-
 type protocolError struct {
 	frame frame
 }
@@ -588,32 +561,10 @@ type callReq struct {
 	framer   *framer
 	timeout  chan struct{} // indicates to recv() that a call has timedout
 	streamID int           // current stream in use
-	req      frameWriter
 
 	timer *time.Timer
 }
 
-func (c *callReq) resetTimeout(timeout time.Duration) <-chan time.Time {
-	var timeoutCh <-chan time.Time
-	if timeout > 0 {
-		if c.timer == nil {
-			c.timer = time.NewTimer(0)
-			<-c.timer.C
-		} else {
-			if !c.timer.Stop() {
-				select {
-				case <-c.timer.C:
-				default:
-				}
-			}
-		}
-
-		c.timer.Reset(timeout)
-		timeoutCh = c.timer.C
-	}
-	return timeoutCh
-}
-
 func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*framer, error) {
 	// TODO: move tracer onto conn
 	stream, ok := c.streams.GetStream()
@@ -628,7 +579,6 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame
 	call.framer = framer
 	call.timeout = make(chan struct{})
 	call.streamID = stream
-	call.req = req
 
 	c.mu.Lock()
 	existingCall := c.calls[stream]
@@ -645,31 +595,39 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame
 		framer.trace()
 	}
 
-	timeoutCh := call.resetTimeout(c.timeout)
-	if err := c.sendFrame(ctx, call, timeoutCh); err != nil {
-		return nil, err
-	}
-
-	if err := c.getResp(ctx, call, timeoutCh); err != nil {
+	err := req.writeFrame(framer, stream)
+	if err != nil {
+		// closeWithError will block waiting for this stream to either receive a response
+		// or for us to timeout, close the timeout chan here. Im not entirely sure
+		// but we should not get a response after an error on the write side.
+		close(call.timeout)
+		// I think this is the correct thing to do, im not entirely sure. It is not
+		// ideal as readers might still get some data, but they probably wont.
+		// Here we need to be careful as the stream is not available and if all
+		// writes just timeout or fail then the pool might use this connection to
+		// send a frame on, with all the streams used up and not returned.
+		c.closeWithError(err)
 		return nil, err
 	}
 
-	// dont release the stream if detect a timeout as another request can reuse
-	// that stream and get a response for the old request, which we have no
-	// easy way of detecting.
-	//
-	// Ensure that the stream is not released if there are potentially outstanding
-	// requests on the stream to prevent nil pointer dereferences in recv().
-	defer c.releaseStream(stream)
+	var timeoutCh <-chan time.Time
+	if c.timeout > 0 {
+		if call.timer == nil {
+			call.timer = time.NewTimer(0)
+			<-call.timer.C
+		} else {
+			if !call.timer.Stop() {
+				select {
+				case <-call.timer.C:
+				default:
+				}
+			}
+		}
 
-	if v := framer.header.version.version(); v != c.version {
-		return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
+		call.timer.Reset(c.timeout)
+		timeoutCh = call.timer.C
 	}
 
-	return framer, nil
-}
-
-func (c *Conn) getResp(ctx context.Context, call *callReq, timeoutCh <-chan time.Time) error {
 	var ctxDone <-chan struct{}
 	if ctx != nil {
 		ctxDone = ctx.Done()
@@ -684,44 +642,34 @@ func (c *Conn) getResp(ctx context.Context, call *callReq, timeoutCh <-chan time
 				// this is because the request is still outstanding and we have
 				// been handed another error from another stream which caused the
 				// connection to close.
-				c.releaseStream(call.streamID)
+				c.releaseStream(stream)
 			}
-			return err
+			return nil, err
 		}
-		return nil
 	case <-timeoutCh:
 		close(call.timeout)
 		c.handleTimeout()
-		return ErrTimeoutNoResponse
+		return nil, ErrTimeoutNoResponse
 	case <-ctxDone:
 		close(call.timeout)
-		return ctx.Err()
+		return nil, ctx.Err()
 	case <-c.quit:
-		return ErrConnectionClosed
+		return nil, ErrConnectionClosed
 	}
-}
 
-func (c *Conn) sendFrame(ctx context.Context, call *callReq, timeoutCh <-chan time.Time) error {
-	var ctxDone <-chan struct{}
-	if ctx != nil {
-		ctxDone = ctx.Done()
-	}
+	// dont release the stream if detect a timeout as another request can reuse
+	// that stream and get a response for the old request, which we have no
+	// easy way of detecting.
+	//
+	// Ensure that the stream is not released if there are potentially outstanding
+	// requests on the stream to prevent nil pointer dereferences in recv().
+	defer c.releaseStream(stream)
 
-	select {
-	case c.frameWriteArgChan <- call:
-		return nil
-	case <-timeoutCh:
-		close(call.timeout)
-		c.releaseStream(call.streamID)
-		c.handleTimeout()
-		return ErrTimeoutNoResponse
-	case <-ctxDone:
-		close(call.timeout)
-		c.releaseStream(call.streamID)
-		return ctx.Err()
-	case <-c.quit:
-		return ErrConnectionClosed
+	if v := framer.header.version.version(); v != c.version {
+		return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
 	}
+
+	return framer, nil
 }
 
 type preparedStatment struct {