broker_test.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. )
  6. func ExampleBroker() error {
  7. broker := NewBroker("localhost:9092")
  8. err := broker.Open(4)
  9. if err != nil {
  10. return err
  11. }
  12. defer broker.Close()
  13. request := MetadataRequest{Topics: []string{"myTopic"}}
  14. response, err := broker.GetMetadata("myClient", &request)
  15. if err != nil {
  16. return err
  17. }
  18. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  19. return nil
  20. }
  21. func TestBrokerAccessors(t *testing.T) {
  22. broker := NewBroker("abc:123")
  23. if broker.ID() != -1 {
  24. t.Error("New broker didn't have an ID of -1.")
  25. }
  26. if broker.Addr() != "abc:123" {
  27. t.Error("New broker didn't have the correct address")
  28. }
  29. broker.id = 34
  30. if broker.ID() != 34 {
  31. t.Error("Manually setting broker ID did not take effect.")
  32. }
  33. }
  34. func TestSimpleBrokerCommunication(t *testing.T) {
  35. mb := NewMockBroker(t, 0)
  36. defer mb.Close()
  37. broker := NewBroker(mb.Addr())
  38. err := broker.Open(4)
  39. if err != nil {
  40. t.Fatal(err)
  41. }
  42. go func() {
  43. for _, tt := range brokerTestTable {
  44. mb.ExpectBytes(tt.response)
  45. }
  46. }()
  47. for _, tt := range brokerTestTable {
  48. tt.runner(t, broker)
  49. }
  50. err = broker.Close()
  51. if err != nil {
  52. t.Error(err)
  53. }
  54. }
  55. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  56. var brokerTestTable = []struct {
  57. response []byte
  58. runner func(*testing.T, *Broker)
  59. }{
  60. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  61. func(t *testing.T, broker *Broker) {
  62. request := MetadataRequest{}
  63. response, err := broker.GetMetadata("clientID", &request)
  64. if err != nil {
  65. t.Error(err)
  66. }
  67. if response == nil {
  68. t.Error("Metadata request got no response!")
  69. }
  70. }},
  71. {[]byte{},
  72. func(t *testing.T, broker *Broker) {
  73. request := ProduceRequest{}
  74. request.RequiredAcks = NoResponse
  75. response, err := broker.Produce("clientID", &request)
  76. if err != nil {
  77. t.Error(err)
  78. }
  79. if response != nil {
  80. t.Error("Produce request with NoResponse got a response!")
  81. }
  82. }},
  83. {[]byte{0x00, 0x00, 0x00, 0x00},
  84. func(t *testing.T, broker *Broker) {
  85. request := ProduceRequest{}
  86. request.RequiredAcks = WaitForLocal
  87. response, err := broker.Produce("clientID", &request)
  88. if err != nil {
  89. t.Error(err)
  90. }
  91. if response == nil {
  92. t.Error("Produce request without NoResponse got no response!")
  93. }
  94. }},
  95. {[]byte{0x00, 0x00, 0x00, 0x00},
  96. func(t *testing.T, broker *Broker) {
  97. request := FetchRequest{}
  98. response, err := broker.Fetch("clientID", &request)
  99. if err != nil {
  100. t.Error(err)
  101. }
  102. if response == nil {
  103. t.Error("Fetch request got no response!")
  104. }
  105. }},
  106. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  107. func(t *testing.T, broker *Broker) {
  108. request := OffsetFetchRequest{}
  109. response, err := broker.FetchOffset("clientID", &request)
  110. if err != nil {
  111. t.Error(err)
  112. }
  113. if response == nil {
  114. t.Error("OffsetFetch request got no response!")
  115. }
  116. }},
  117. {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  118. func(t *testing.T, broker *Broker) {
  119. request := OffsetCommitRequest{}
  120. response, err := broker.CommitOffset("clientID", &request)
  121. if err != nil {
  122. t.Error(err)
  123. }
  124. if response == nil {
  125. t.Error("OffsetCommit request got no response!")
  126. }
  127. }},
  128. {[]byte{0x00, 0x00, 0x00, 0x00},
  129. func(t *testing.T, broker *Broker) {
  130. request := OffsetRequest{}
  131. response, err := broker.GetAvailableOffsets("clientID", &request)
  132. if err != nil {
  133. t.Error(err)
  134. }
  135. if response == nil {
  136. t.Error("Offset request got no response!")
  137. }
  138. }},
  139. }