Explorar o código

conn: dont use stream 0 for outgoing requests

If Cassandra has an error and does not honor the protocol and returns
a response on a different stream than the request then the stream will
be 0, this can happen if there is a corrputed frame or an invalid
protocol version.

Detect the responses on stream 0 and cause it to be sent to all
outstanding requests so that they don't block indefinatly, as they may
have caused the original error.
Chris Bannister %!s(int64=10) %!d(string=hai) anos
pai
achega
c5ae7cf29d
Modificáronse 2 ficheiros con 69 adicións e 1 borrados
  1. 45 0
      cassandra_test.go
  2. 24 1
      conn.go

+ 45 - 0
cassandra_test.go

@@ -1982,3 +1982,48 @@ func TestTokenAwareConnPool(t *testing.T) {
 
 	// TODO add verification that the query went to the correct host
 }
+
+type frameWriterFunc func(framer *framer, streamID int) error
+
+func (f frameWriterFunc) writeFrame(framer *framer, streamID int) error {
+	return f(framer, streamID)
+}
+
+func TestStream0(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+
+	var conn *Conn
+	for i := 0; i < 5; i++ {
+		if conn != nil {
+			break
+		}
+
+		conn = session.Pool.Pick(nil)
+	}
+
+	if conn == nil {
+		t.Fatal("no connections available in the pool")
+	}
+
+	writer := frameWriterFunc(func(f *framer, streamID int) error {
+		if streamID == 0 {
+			t.Fatal("should not use stream 0 for requests")
+		}
+		f.writeHeader(0, opError, streamID)
+		f.writeString("i am a bad frame")
+		f.wbuf[0] = 0xFF
+		return f.finishWrite()
+	})
+
+	const expErr = "gocql: error on stream 0: Invalid or unsupported protocol version: 127"
+	// need to write out an invalid frame, which we need a connection to do
+	frame, err := conn.exec(writer, nil)
+	if err == nil {
+		t.Fatal("expected to get an error on stream 0")
+	} else if err.Error() != expErr {
+		t.Fatalf("expected to get error %q got %q", expErr, err.Error())
+	} else if frame != nil {
+		t.Fatalf("expected to get nil frame got %+v", frame)
+	}
+}

+ 24 - 1
conn.go

@@ -159,6 +159,8 @@ func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn,
 
 	if cfg.NumStreams <= 0 || cfg.NumStreams > maxStreams {
 		cfg.NumStreams = maxStreams
+	} else {
+		cfg.NumStreams++
 	}
 
 	c := &Conn{
@@ -180,7 +182,9 @@ func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn,
 		c.setKeepalive(cfg.Keepalive)
 	}
 
-	for i := 0; i < cfg.NumStreams; i++ {
+	// reserve stream 0 incase cassandra returns an error on it without us sending
+	// a request.
+	for i := 1; i < cfg.NumStreams; i++ {
 		c.calls[i].resp = make(chan error)
 		c.uniq <- i
 	}
@@ -370,6 +374,25 @@ func (c *Conn) recv() error {
 			return err
 		}
 		return nil
+	} else if head.stream == 0 {
+		// reserved stream that we dont use, probably due to a protocol error
+		// or a bug in Cassandra, this should be an error, parse it and return.
+		framer := newFramer(c, c, c.compressor, c.version)
+		if err := framer.readFrame(&head); err != nil {
+			return err
+		}
+
+		frame, err := framer.parseFrame()
+		if err != nil {
+			return err
+		}
+
+		switch v := frame.(type) {
+		case error:
+			return fmt.Errorf("gocql: error on stream 0: %v", v)
+		default:
+			return fmt.Errorf("gocql: received frame on stream 0: %v", frame)
+		}
 	}
 
 	call := &c.calls[head.stream]