broker_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package protocol
  2. import (
  3. "fmt"
  4. "sarama/mock"
  5. "sarama/types"
  6. "testing"
  7. )
  8. func ExampleBroker() error {
  9. broker := NewBroker("localhost", 9092)
  10. err := broker.Connect()
  11. if err != nil {
  12. return err
  13. }
  14. request := MetadataRequest{Topics: []string{"myTopic"}}
  15. response, err := broker.GetMetadata("myClient", &request)
  16. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  17. broker.Close()
  18. return nil
  19. }
  20. func TestBrokerEquals(t *testing.T) {
  21. var b1, b2 *Broker
  22. b1 = nil
  23. b2 = nil
  24. if !b1.Equals(b2) {
  25. t.Error("Two nil brokers didn't compare equal.")
  26. }
  27. b1 = NewBroker("abc", 123)
  28. if b1.Equals(b2) {
  29. t.Error("Non-nil and nil brokers compared equal.")
  30. }
  31. if b2.Equals(b1) {
  32. t.Error("Nil and non-nil brokers compared equal.")
  33. }
  34. b2 = NewBroker("abc", 1234)
  35. if b1.Equals(b2) || b2.Equals(b1) {
  36. t.Error("Brokers with different ports compared equal.")
  37. }
  38. b2 = NewBroker("abcd", 123)
  39. if b1.Equals(b2) || b2.Equals(b1) {
  40. t.Error("Brokers with different hosts compared equal.")
  41. }
  42. b2 = NewBroker("abc", 123)
  43. b2.id = -2
  44. if b1.Equals(b2) || b2.Equals(b1) {
  45. t.Error("Brokers with different ids compared equal.")
  46. }
  47. b2.id = -1
  48. if !b1.Equals(b2) || !b2.Equals(b1) {
  49. t.Error("Similar brokers did not compare equal.")
  50. }
  51. }
  52. func TestBrokerID(t *testing.T) {
  53. broker := NewBroker("abc", 123)
  54. if broker.ID() != -1 {
  55. t.Error("New broker didn't have an ID of -1.")
  56. }
  57. broker.id = 34
  58. if broker.ID() != 34 {
  59. t.Error("Manually setting broker ID did not take effect.")
  60. }
  61. }
  62. func TestSimpleBrokerCommunication(t *testing.T) {
  63. responses := make(chan []byte)
  64. mockBroker := mock.NewBroker(t, responses)
  65. defer mockBroker.Close()
  66. broker := NewBroker("localhost", mockBroker.Port())
  67. err := broker.Connect()
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. go func() {
  72. for _, tt := range brokerTestTable {
  73. responses <- tt.response
  74. }
  75. }()
  76. for _, tt := range brokerTestTable {
  77. tt.runner(t, broker)
  78. }
  79. err = broker.Close()
  80. if err != nil {
  81. t.Error(err)
  82. }
  83. }
  84. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  85. var brokerTestTable = []struct {
  86. response []byte
  87. runner func(*testing.T, *Broker)
  88. }{
  89. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  90. func(t *testing.T, broker *Broker) {
  91. request := MetadataRequest{}
  92. response, err := broker.GetMetadata("clientID", &request)
  93. if err != nil {
  94. t.Error(err)
  95. }
  96. if response == nil {
  97. t.Error("Metadata request got no response!")
  98. }
  99. }},
  100. {[]byte{},
  101. func(t *testing.T, broker *Broker) {
  102. request := ProduceRequest{}
  103. request.RequiredAcks = types.NO_RESPONSE
  104. response, err := broker.Produce("clientID", &request)
  105. if err != nil {
  106. t.Error(err)
  107. }
  108. if response != nil {
  109. t.Error("Produce request with NO_RESPONSE got a response!")
  110. }
  111. }},
  112. {[]byte{0x00, 0x00, 0x00, 0x00},
  113. func(t *testing.T, broker *Broker) {
  114. request := ProduceRequest{}
  115. request.RequiredAcks = types.WAIT_FOR_LOCAL
  116. response, err := broker.Produce("clientID", &request)
  117. if err != nil {
  118. t.Error(err)
  119. }
  120. if response == nil {
  121. t.Error("Produce request without NO_RESPONSE got no response!")
  122. }
  123. }},
  124. {[]byte{0x00, 0x00, 0x00, 0x00},
  125. func(t *testing.T, broker *Broker) {
  126. request := FetchRequest{}
  127. response, err := broker.Fetch("clientID", &request)
  128. if err != nil {
  129. t.Error(err)
  130. }
  131. if response == nil {
  132. t.Error("Fetch request got no response!")
  133. }
  134. }},
  135. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  136. func(t *testing.T, broker *Broker) {
  137. request := OffsetFetchRequest{}
  138. response, err := broker.FetchOffset("clientID", &request)
  139. if err != nil {
  140. t.Error(err)
  141. }
  142. if response == nil {
  143. t.Error("OffsetFetch request got no response!")
  144. }
  145. }},
  146. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  147. func(t *testing.T, broker *Broker) {
  148. request := OffsetCommitRequest{}
  149. response, err := broker.CommitOffset("clientID", &request)
  150. if err != nil {
  151. t.Error(err)
  152. }
  153. if response == nil {
  154. t.Error("OffsetCommit request got no response!")
  155. }
  156. }},
  157. {[]byte{0x00, 0x00, 0x00, 0x00},
  158. func(t *testing.T, broker *Broker) {
  159. request := OffsetRequest{}
  160. response, err := broker.GetAvailableOffsets("clientID", &request)
  161. if err != nil {
  162. t.Error(err)
  163. }
  164. if response == nil {
  165. t.Error("Offset request got no response!")
  166. }
  167. }},
  168. }