|
@@ -1,123 +1,12 @@
|
|
|
package protocol
|
|
package protocol
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
- "encoding/binary"
|
|
|
|
|
"fmt"
|
|
"fmt"
|
|
|
- "io"
|
|
|
|
|
- "net"
|
|
|
|
|
|
|
+ "sarama/mock"
|
|
|
"sarama/types"
|
|
"sarama/types"
|
|
|
- "strconv"
|
|
|
|
|
"testing"
|
|
"testing"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-// FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
|
|
|
|
|
-// It takes a testing.T as provided by the test framework and a channel of responses to use.
|
|
|
|
|
-// It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
|
|
|
|
|
-// from that port and returns each provided response in order (if a response is nil, nothing is sent).
|
|
|
|
|
-// When the server is successfully created, it returns the port on which it is listening and a 'done' channel
|
|
|
|
|
-// which it will close when it exits. Otherwise it will return an error (if an error occurs *in* the goroutine it
|
|
|
|
|
-// is simply logged to the testing.T and the goroutine exits). There is also a StopFakeServer helper that leads to
|
|
|
|
|
-// this recommended pattern in tests:
|
|
|
|
|
-//
|
|
|
|
|
-// port, done, err := FakeKafkaServer(t, responses)
|
|
|
|
|
-// if err != nil {
|
|
|
|
|
-// t.Fatal(err)
|
|
|
|
|
-// }
|
|
|
|
|
-// defer StopFakeServer(responses, done)
|
|
|
|
|
-//
|
|
|
|
|
-// When running tests like this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
|
|
|
|
|
-// waiting for a response, it automatically panics.
|
|
|
|
|
-//
|
|
|
|
|
-// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
|
|
|
|
|
-// automatically as a convenience.
|
|
|
|
|
-func FakeKafkaServer(t *testing.T, responses <-chan []byte) (int32, <-chan bool, error) {
|
|
|
|
|
- ln, err := net.Listen("tcp", "localhost:0")
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return 0, nil, err
|
|
|
|
|
- }
|
|
|
|
|
- _, portStr, err := net.SplitHostPort(ln.Addr().String())
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return 0, nil, err
|
|
|
|
|
- }
|
|
|
|
|
- tmp, err := strconv.ParseInt(portStr, 10, 32)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return 0, nil, err
|
|
|
|
|
- }
|
|
|
|
|
- port := int32(tmp)
|
|
|
|
|
- done := make(chan bool)
|
|
|
|
|
- go func() {
|
|
|
|
|
- defer close(done)
|
|
|
|
|
- conn, err := ln.Accept()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- conn.Close()
|
|
|
|
|
- ln.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- reqHeader := make([]byte, 4)
|
|
|
|
|
- resHeader := make([]byte, 8)
|
|
|
|
|
- for response := range responses {
|
|
|
|
|
- _, err := io.ReadFull(conn, reqHeader)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- conn.Close()
|
|
|
|
|
- ln.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- body := make([]byte, binary.BigEndian.Uint32(reqHeader))
|
|
|
|
|
- if len(body) < 10 {
|
|
|
|
|
- t.Error("Kafka request too short.")
|
|
|
|
|
- conn.Close()
|
|
|
|
|
- ln.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- _, err = io.ReadFull(conn, body)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- conn.Close()
|
|
|
|
|
- ln.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- if response == nil {
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
|
|
|
|
|
- binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
|
|
|
|
|
- _, err = conn.Write(resHeader)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- conn.Close()
|
|
|
|
|
- ln.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- _, err = conn.Write(response)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- conn.Close()
|
|
|
|
|
- ln.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- err = conn.Close()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- ln.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- err = ln.Close()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
- return port, done, nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func StopFakeServer(responses chan []byte, done <-chan bool) {
|
|
|
|
|
- close(responses)
|
|
|
|
|
- <-done
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func ExampleBroker() error {
|
|
func ExampleBroker() error {
|
|
|
broker := NewBroker("localhost", 9092)
|
|
broker := NewBroker("localhost", 9092)
|
|
|
err := broker.Connect()
|
|
err := broker.Connect()
|
|
@@ -192,13 +81,13 @@ func TestBrokerID(t *testing.T) {
|
|
|
|
|
|
|
|
func TestSimpleBrokerCommunication(t *testing.T) {
|
|
func TestSimpleBrokerCommunication(t *testing.T) {
|
|
|
responses := make(chan []byte)
|
|
responses := make(chan []byte)
|
|
|
- port, done, err := FakeKafkaServer(t, responses)
|
|
|
|
|
|
|
+ mockBroker, err := mock.NewBroker(t, responses)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
|
}
|
|
}
|
|
|
- defer StopFakeServer(responses, done)
|
|
|
|
|
|
|
+ defer mockBroker.Close()
|
|
|
|
|
|
|
|
- broker := NewBroker("localhost", port)
|
|
|
|
|
|
|
+ broker := NewBroker("localhost", mockBroker.Port())
|
|
|
err = broker.Connect()
|
|
err = broker.Connect()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|