Quellcode durchsuchen

Run the process function concurrently in testserver

The server should be able to process multiple streams on the
same connection concurrently like Cassandra can, do this.

Also improve error handling and failure in readFrame.
Chris Bannister vor 10 Jahren
Ursprung
Commit
f1e2cf7fc3
1 geänderte Dateien mit 26 neuen und 9 gelöschten Zeilen
  1. 26 9
      conn_test.go

+ 26 - 9
conn_test.go

@@ -458,9 +458,16 @@ func (srv *TestServer) serve() {
 		go func(conn net.Conn) {
 			defer conn.Close()
 			for {
-				frame := srv.readFrame(conn)
+				frame, err := srv.readFrame(conn)
+				if err == io.EOF {
+					return
+				} else if err != nil {
+					srv.t.Error(err)
+					continue
+				}
+
 				atomic.AddUint64(&srv.nreq, 1)
-				srv.process(frame, conn)
+				go srv.process(frame, conn)
 			}
 		}(conn)
 	}
@@ -531,16 +538,26 @@ func (srv *TestServer) process(f frame, conn net.Conn) {
 	}
 }
 
-func (srv *TestServer) readFrame(conn net.Conn) frame {
-	frame := make(frame, headerSize, headerSize+512)
+func (srv *TestServer) readFrame(conn net.Conn) (frame, error) {
+	frame := make(frame, srv.headerSize, srv.headerSize+512)
 	if _, err := io.ReadFull(conn, frame); err != nil {
-		srv.t.Fatal(err)
+		return nil, err
+	}
+
+	// should be a request frame
+	if frame[0]&protoDirectionMask != 0 {
+		return nil, fmt.Errorf("expected to read a request frame got version: 0x%x", frame[0])
+	}
+	if v := frame[0] & protoVersionMask; v != srv.protocol {
+		return nil, fmt.Errorf("expected to read protocol version 0x%x got 0x%x", srv.protocol, v)
 	}
-	if n := frame.Length(); n > 0 {
+
+	if n := frame.Length(srv.protocol); n > 0 {
 		frame.grow(n)
-		if _, err := io.ReadFull(conn, frame[headerSize:]); err != nil {
-			srv.t.Fatal(err)
+		if _, err := io.ReadFull(conn, frame[srv.headerSize:]); err != nil {
+			return nil, err
 		}
 	}
-	return frame
+
+	return frame, nil
 }