Przeglądaj źródła

Automatically write the correlation ID for convenience

Evan Huus 12 lat temu
rodzic
commit
846e5a5ae5
1 zmienionych plików z 15 dodań i 6 usunięć
  1. 15 6
      protocol/broker_test.go

+ 15 - 6
protocol/broker_test.go

@@ -20,7 +20,8 @@ import (
 // When running tests with this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
 // When running tests with this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
 // waiting for a response, it automatically panics.
 // waiting for a response, it automatically panics.
 //
 //
-// It is not necessary to prefix message length to your response bytes, the server does that automatically as a convenience.
+// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
+// automatically as a convenience.
 func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (int32, error) {
 func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (int32, error) {
 	ln, err := net.Listen("tcp", "localhost:0")
 	ln, err := net.Listen("tcp", "localhost:0")
 	if err != nil {
 	if err != nil {
@@ -44,16 +45,23 @@ func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (i
 			ln.Close()
 			ln.Close()
 			return
 			return
 		}
 		}
+		reqHeader := make([]byte, 4)
+		resHeader := make([]byte, 8)
 		for response := range responses {
 		for response := range responses {
-			header := make([]byte, 4)
-			_, err := io.ReadFull(conn, header)
+			_, err := io.ReadFull(conn, reqHeader)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 				conn.Close()
 				conn.Close()
 				ln.Close()
 				ln.Close()
 				return
 				return
 			}
 			}
-			body := make([]byte, binary.BigEndian.Uint32(header))
+			body := make([]byte, binary.BigEndian.Uint32(reqHeader))
+			if len(body) < 10 {
+				t.Error("Kafka request too short.")
+				conn.Close()
+				ln.Close()
+				return
+			}
 			_, err = io.ReadFull(conn, body)
 			_, err = io.ReadFull(conn, body)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
@@ -64,8 +72,9 @@ func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (i
 			if response == nil {
 			if response == nil {
 				continue
 				continue
 			}
 			}
-			binary.BigEndian.PutUint32(header, uint32(len(response)))
-			_, err = conn.Write(header)
+			binary.BigEndian.PutUint32(resHeader, uint32(len(response)))
+			binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
+			_, err = conn.Write(resHeader)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 				conn.Close()
 				conn.Close()