broker_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func ExampleBroker() {
  8. broker := NewBroker("localhost:9092")
  9. err := broker.Open(nil)
  10. if err != nil {
  11. panic(err)
  12. }
  13. request := MetadataRequest{Topics: []string{"myTopic"}}
  14. response, err := broker.GetMetadata(&request)
  15. if err != nil {
  16. _ = broker.Close()
  17. panic(err)
  18. }
  19. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  20. if err = broker.Close(); err != nil {
  21. panic(err)
  22. }
  23. }
  24. type mockEncoder struct {
  25. bytes []byte
  26. }
  27. func (m mockEncoder) encode(pe packetEncoder) error {
  28. return pe.putRawBytes(m.bytes)
  29. }
  30. func TestBrokerAccessors(t *testing.T) {
  31. broker := NewBroker("abc:123")
  32. if broker.ID() != -1 {
  33. t.Error("New broker didn't have an ID of -1.")
  34. }
  35. if broker.Addr() != "abc:123" {
  36. t.Error("New broker didn't have the correct address")
  37. }
  38. broker.id = 34
  39. if broker.ID() != 34 {
  40. t.Error("Manually setting broker ID did not take effect.")
  41. }
  42. }
  43. func TestSimpleBrokerCommunication(t *testing.T) {
  44. for _, tt := range brokerTestTable {
  45. t.Log("Testing broker communication for", tt.name)
  46. mb := NewMockBroker(t, 0)
  47. // Do not add expectation for ProduceRequest (No Response)
  48. if len(tt.response) != 0 {
  49. mb.Returns(&mockEncoder{tt.response})
  50. }
  51. broker := NewBroker(mb.Addr())
  52. // Set the broker id in order to validate local broker metrics
  53. broker.id = 0
  54. conf := NewConfig()
  55. conf.Version = V0_10_0_0
  56. // Use a new registry every time to prevent side effect caused by the global one
  57. conf.MetricRegistry = metrics.NewRegistry()
  58. err := broker.Open(conf)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. tt.runner(t, broker)
  63. err = broker.Close()
  64. if err != nil {
  65. t.Error(err)
  66. }
  67. mb.Close()
  68. validateBrokerMetrics(t, broker, mb)
  69. }
  70. }
  71. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  72. var brokerTestTable = []struct {
  73. name string
  74. response []byte
  75. runner func(*testing.T, *Broker)
  76. }{
  77. {"MetadataRequest",
  78. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  79. func(t *testing.T, broker *Broker) {
  80. request := MetadataRequest{}
  81. response, err := broker.GetMetadata(&request)
  82. if err != nil {
  83. t.Error(err)
  84. }
  85. if response == nil {
  86. t.Error("Metadata request got no response!")
  87. }
  88. }},
  89. {"ConsumerMetadataRequest",
  90. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  91. func(t *testing.T, broker *Broker) {
  92. request := ConsumerMetadataRequest{}
  93. response, err := broker.GetConsumerMetadata(&request)
  94. if err != nil {
  95. t.Error(err)
  96. }
  97. if response == nil {
  98. t.Error("Consumer Metadata request got no response!")
  99. }
  100. }},
  101. {"ProduceRequest (NoResponse)",
  102. []byte{},
  103. func(t *testing.T, broker *Broker) {
  104. request := ProduceRequest{}
  105. request.RequiredAcks = NoResponse
  106. response, err := broker.Produce(&request)
  107. if err != nil {
  108. t.Error(err)
  109. }
  110. if response != nil {
  111. t.Error("Produce request with NoResponse got a response!")
  112. }
  113. }},
  114. {"ProduceRequest (WaitForLocal)",
  115. []byte{0x00, 0x00, 0x00, 0x00},
  116. func(t *testing.T, broker *Broker) {
  117. request := ProduceRequest{}
  118. request.RequiredAcks = WaitForLocal
  119. response, err := broker.Produce(&request)
  120. if err != nil {
  121. t.Error(err)
  122. }
  123. if response == nil {
  124. t.Error("Produce request without NoResponse got no response!")
  125. }
  126. }},
  127. {"FetchRequest",
  128. []byte{0x00, 0x00, 0x00, 0x00},
  129. func(t *testing.T, broker *Broker) {
  130. request := FetchRequest{}
  131. response, err := broker.Fetch(&request)
  132. if err != nil {
  133. t.Error(err)
  134. }
  135. if response == nil {
  136. t.Error("Fetch request got no response!")
  137. }
  138. }},
  139. {"OffsetFetchRequest",
  140. []byte{0x00, 0x00, 0x00, 0x00},
  141. func(t *testing.T, broker *Broker) {
  142. request := OffsetFetchRequest{}
  143. response, err := broker.FetchOffset(&request)
  144. if err != nil {
  145. t.Error(err)
  146. }
  147. if response == nil {
  148. t.Error("OffsetFetch request got no response!")
  149. }
  150. }},
  151. {"OffsetCommitRequest",
  152. []byte{0x00, 0x00, 0x00, 0x00},
  153. func(t *testing.T, broker *Broker) {
  154. request := OffsetCommitRequest{}
  155. response, err := broker.CommitOffset(&request)
  156. if err != nil {
  157. t.Error(err)
  158. }
  159. if response == nil {
  160. t.Error("OffsetCommit request got no response!")
  161. }
  162. }},
  163. {"OffsetRequest",
  164. []byte{0x00, 0x00, 0x00, 0x00},
  165. func(t *testing.T, broker *Broker) {
  166. request := OffsetRequest{}
  167. response, err := broker.GetAvailableOffsets(&request)
  168. if err != nil {
  169. t.Error(err)
  170. }
  171. if response == nil {
  172. t.Error("Offset request got no response!")
  173. }
  174. }},
  175. {"JoinGroupRequest",
  176. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  177. func(t *testing.T, broker *Broker) {
  178. request := JoinGroupRequest{}
  179. response, err := broker.JoinGroup(&request)
  180. if err != nil {
  181. t.Error(err)
  182. }
  183. if response == nil {
  184. t.Error("JoinGroup request got no response!")
  185. }
  186. }},
  187. {"SyncGroupRequest",
  188. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  189. func(t *testing.T, broker *Broker) {
  190. request := SyncGroupRequest{}
  191. response, err := broker.SyncGroup(&request)
  192. if err != nil {
  193. t.Error(err)
  194. }
  195. if response == nil {
  196. t.Error("SyncGroup request got no response!")
  197. }
  198. }},
  199. {"LeaveGroupRequest",
  200. []byte{0x00, 0x00},
  201. func(t *testing.T, broker *Broker) {
  202. request := LeaveGroupRequest{}
  203. response, err := broker.LeaveGroup(&request)
  204. if err != nil {
  205. t.Error(err)
  206. }
  207. if response == nil {
  208. t.Error("LeaveGroup request got no response!")
  209. }
  210. }},
  211. {"HeartbeatRequest",
  212. []byte{0x00, 0x00},
  213. func(t *testing.T, broker *Broker) {
  214. request := HeartbeatRequest{}
  215. response, err := broker.Heartbeat(&request)
  216. if err != nil {
  217. t.Error(err)
  218. }
  219. if response == nil {
  220. t.Error("Heartbeat request got no response!")
  221. }
  222. }},
  223. {"ListGroupsRequest",
  224. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  225. func(t *testing.T, broker *Broker) {
  226. request := ListGroupsRequest{}
  227. response, err := broker.ListGroups(&request)
  228. if err != nil {
  229. t.Error(err)
  230. }
  231. if response == nil {
  232. t.Error("ListGroups request got no response!")
  233. }
  234. }},
  235. {"DescribeGroupsRequest",
  236. []byte{0x00, 0x00, 0x00, 0x00},
  237. func(t *testing.T, broker *Broker) {
  238. request := DescribeGroupsRequest{}
  239. response, err := broker.DescribeGroups(&request)
  240. if err != nil {
  241. t.Error(err)
  242. }
  243. if response == nil {
  244. t.Error("DescribeGroups request got no response!")
  245. }
  246. }},
  247. }
  248. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBroker *MockBroker) {
  249. metricValidators := newMetricValidators()
  250. mockBrokerBytesRead := 0
  251. mockBrokerBytesWritten := 0
  252. // Compute socket bytes
  253. for _, requestResponse := range mockBroker.History() {
  254. mockBrokerBytesRead += requestResponse.RequestSize
  255. mockBrokerBytesWritten += requestResponse.ResponseSize
  256. }
  257. // Check that the number of bytes sent corresponds to what the mock broker received
  258. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  259. if mockBrokerBytesWritten == 0 {
  260. // This a ProduceRequest with NoResponse
  261. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  262. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  263. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  264. } else {
  265. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  266. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  267. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  268. }
  269. // Check that the number of bytes received corresponds to what the mock broker sent
  270. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  271. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  272. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  273. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  274. // Run the validators
  275. metricValidators.run(t, broker.conf.MetricRegistry)
  276. }