broker_test.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package sarama
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama/mockbroker"
  5. "testing"
  6. )
  7. func ExampleBroker() error {
  8. broker := NewBroker("localhost:9092")
  9. err := broker.Open(4)
  10. if err != nil {
  11. return err
  12. }
  13. defer broker.Close()
  14. request := MetadataRequest{Topics: []string{"myTopic"}}
  15. response, err := broker.GetMetadata("myClient", &request)
  16. if err != nil {
  17. return err
  18. }
  19. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  20. return nil
  21. }
  22. func TestBrokerAccessors(t *testing.T) {
  23. broker := NewBroker("abc:123")
  24. if broker.ID() != -1 {
  25. t.Error("New broker didn't have an ID of -1.")
  26. }
  27. if broker.Addr() != "abc:123" {
  28. t.Error("New broker didn't have the correct address")
  29. }
  30. broker.id = 34
  31. if broker.ID() != 34 {
  32. t.Error("Manually setting broker ID did not take effect.")
  33. }
  34. }
  35. func TestSimpleBrokerCommunication(t *testing.T) {
  36. responses := make(chan []byte)
  37. mockBroker := mockbroker.New(t, responses)
  38. defer mockBroker.Close()
  39. broker := NewBroker(mockBroker.Addr())
  40. err := broker.Open(4)
  41. if err != nil {
  42. t.Fatal(err)
  43. }
  44. go func() {
  45. for _, tt := range brokerTestTable {
  46. responses <- tt.response
  47. }
  48. }()
  49. for _, tt := range brokerTestTable {
  50. tt.runner(t, broker)
  51. }
  52. err = broker.Close()
  53. if err != nil {
  54. t.Error(err)
  55. }
  56. }
  57. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  58. var brokerTestTable = []struct {
  59. response []byte
  60. runner func(*testing.T, *Broker)
  61. }{
  62. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  63. func(t *testing.T, broker *Broker) {
  64. request := MetadataRequest{}
  65. response, err := broker.GetMetadata("clientID", &request)
  66. if err != nil {
  67. t.Error(err)
  68. }
  69. if response == nil {
  70. t.Error("Metadata request got no response!")
  71. }
  72. }},
  73. {[]byte{},
  74. func(t *testing.T, broker *Broker) {
  75. request := ProduceRequest{}
  76. request.RequiredAcks = NoResponse
  77. response, err := broker.Produce("clientID", &request)
  78. if err != nil {
  79. t.Error(err)
  80. }
  81. if response != nil {
  82. t.Error("Produce request with NoResponse got a response!")
  83. }
  84. }},
  85. {[]byte{0x00, 0x00, 0x00, 0x00},
  86. func(t *testing.T, broker *Broker) {
  87. request := ProduceRequest{}
  88. request.RequiredAcks = WaitForLocal
  89. response, err := broker.Produce("clientID", &request)
  90. if err != nil {
  91. t.Error(err)
  92. }
  93. if response == nil {
  94. t.Error("Produce request without NoResponse got no response!")
  95. }
  96. }},
  97. {[]byte{0x00, 0x00, 0x00, 0x00},
  98. func(t *testing.T, broker *Broker) {
  99. request := FetchRequest{}
  100. response, err := broker.Fetch("clientID", &request)
  101. if err != nil {
  102. t.Error(err)
  103. }
  104. if response == nil {
  105. t.Error("Fetch request got no response!")
  106. }
  107. }},
  108. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  109. func(t *testing.T, broker *Broker) {
  110. request := OffsetFetchRequest{}
  111. response, err := broker.FetchOffset("clientID", &request)
  112. if err != nil {
  113. t.Error(err)
  114. }
  115. if response == nil {
  116. t.Error("OffsetFetch request got no response!")
  117. }
  118. }},
  119. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  120. func(t *testing.T, broker *Broker) {
  121. request := OffsetCommitRequest{}
  122. response, err := broker.CommitOffset("clientID", &request)
  123. if err != nil {
  124. t.Error(err)
  125. }
  126. if response == nil {
  127. t.Error("OffsetCommit request got no response!")
  128. }
  129. }},
  130. {[]byte{0x00, 0x00, 0x00, 0x00},
  131. func(t *testing.T, broker *Broker) {
  132. request := OffsetRequest{}
  133. response, err := broker.GetAvailableOffsets("clientID", &request)
  134. if err != nil {
  135. t.Error(err)
  136. }
  137. if response == nil {
  138. t.Error("Offset request got no response!")
  139. }
  140. }},
  141. }