mockresponses_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package sarama
  2. import (
  3. "testing"
  4. )
  5. // MockResponse is a response builder interface it defines one method that
  6. // allows generating a response based on a request body.
  7. type MockResponse interface {
  8. For(reqBody decoder) (res encoder)
  9. }
  10. type mockWrapper struct {
  11. res encoder
  12. }
  13. func (mw *mockWrapper) For(reqBody decoder) (res encoder) {
  14. return mw.res
  15. }
  16. func newMockWrapper(res encoder) *mockWrapper {
  17. return &mockWrapper{res: res}
  18. }
  19. // mockMetadataResponse is a `MetadataResponse` builder.
  20. type mockMetadataResponse struct {
  21. leaders map[string]map[int32]int32
  22. brokers map[string]int32
  23. t *testing.T
  24. }
  25. func newMockMetadataResponse(t *testing.T) *mockMetadataResponse {
  26. return &mockMetadataResponse{
  27. leaders: make(map[string]map[int32]int32),
  28. brokers: make(map[string]int32),
  29. t: t,
  30. }
  31. }
  32. func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *mockMetadataResponse {
  33. partitions := mmr.leaders[topic]
  34. if partitions == nil {
  35. partitions = make(map[int32]int32)
  36. mmr.leaders[topic] = partitions
  37. }
  38. partitions[partition] = brokerID
  39. return mmr
  40. }
  41. func (mmr *mockMetadataResponse) SetBroker(addr string, brokerID int32) *mockMetadataResponse {
  42. mmr.brokers[addr] = brokerID
  43. return mmr
  44. }
  45. func (mor *mockMetadataResponse) For(reqBody decoder) encoder {
  46. metadataRequest := reqBody.(*MetadataRequest)
  47. metadataResponse := &MetadataResponse{}
  48. for addr, brokerID := range mor.brokers {
  49. metadataResponse.AddBroker(addr, brokerID)
  50. }
  51. if len(metadataRequest.Topics) == 0 {
  52. for topic, partitions := range mor.leaders {
  53. for partition, brokerID := range partitions {
  54. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  55. }
  56. }
  57. return metadataResponse
  58. }
  59. for _, topic := range metadataRequest.Topics {
  60. for partition, brokerID := range mor.leaders[topic] {
  61. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  62. }
  63. }
  64. return metadataResponse
  65. }
  66. // mockOffsetResponse is an `OffsetResponse` builder.
  67. type mockOffsetResponse struct {
  68. offsets map[string]map[int32]map[int64]int64
  69. t *testing.T
  70. }
  71. func newMockOffsetResponse(t *testing.T) *mockOffsetResponse {
  72. return &mockOffsetResponse{
  73. offsets: make(map[string]map[int32]map[int64]int64),
  74. t: t,
  75. }
  76. }
  77. func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *mockOffsetResponse {
  78. partitions := mor.offsets[topic]
  79. if partitions == nil {
  80. partitions = make(map[int32]map[int64]int64)
  81. mor.offsets[topic] = partitions
  82. }
  83. times := partitions[partition]
  84. if times == nil {
  85. times = make(map[int64]int64)
  86. partitions[partition] = times
  87. }
  88. times[time] = offset
  89. return mor
  90. }
  91. func (mor *mockOffsetResponse) For(reqBody decoder) encoder {
  92. offsetRequest := reqBody.(*OffsetRequest)
  93. offsetResponse := &OffsetResponse{}
  94. for topic, partitions := range offsetRequest.blocks {
  95. for partition, block := range partitions {
  96. offset := mor.getOffset(topic, partition, block.time)
  97. offsetResponse.AddTopicPartition(topic, partition, offset)
  98. }
  99. }
  100. return offsetResponse
  101. }
  102. func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  103. partitions := mor.offsets[topic]
  104. if partitions == nil {
  105. mor.t.Errorf("missing topic: %s", topic)
  106. }
  107. times := partitions[partition]
  108. if times == nil {
  109. mor.t.Errorf("missing partition: %d", partition)
  110. }
  111. offset, ok := times[time]
  112. if !ok {
  113. mor.t.Errorf("missing time: %d", time)
  114. }
  115. return offset
  116. }
  117. // mockFetchResponse is a `FetchResponse` builder.
  118. type mockFetchResponse struct {
  119. messages map[string]map[int32]map[int64]Encoder
  120. highWaterMarks map[string]map[int32]int64
  121. t *testing.T
  122. batchSize int
  123. }
  124. func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse {
  125. return &mockFetchResponse{
  126. messages: make(map[string]map[int32]map[int64]Encoder),
  127. highWaterMarks: make(map[string]map[int32]int64),
  128. t: t,
  129. batchSize: batchSize,
  130. }
  131. }
  132. func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *mockFetchResponse {
  133. partitions := mfr.messages[topic]
  134. if partitions == nil {
  135. partitions = make(map[int32]map[int64]Encoder)
  136. mfr.messages[topic] = partitions
  137. }
  138. messages := partitions[partition]
  139. if messages == nil {
  140. messages = make(map[int64]Encoder)
  141. partitions[partition] = messages
  142. }
  143. messages[offset] = msg
  144. return mfr
  145. }
  146. func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *mockFetchResponse {
  147. partitions := mfr.highWaterMarks[topic]
  148. if partitions == nil {
  149. partitions = make(map[int32]int64)
  150. mfr.highWaterMarks[topic] = partitions
  151. }
  152. partitions[partition] = offset
  153. return mfr
  154. }
  155. func (mfr *mockFetchResponse) For(reqBody decoder) encoder {
  156. fetchRequest := reqBody.(*FetchRequest)
  157. res := &FetchResponse{}
  158. for topic, partitions := range fetchRequest.blocks {
  159. for partition, block := range partitions {
  160. initialOffset := block.fetchOffset
  161. offset := initialOffset
  162. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  163. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  164. msg := mfr.getMessage(topic, partition, offset)
  165. if msg != nil {
  166. res.AddMessage(topic, partition, nil, msg, offset)
  167. i++
  168. }
  169. offset++
  170. }
  171. fb := res.GetBlock(topic, partition)
  172. if fb == nil {
  173. res.AddError(topic, partition, ErrNoError)
  174. fb = res.GetBlock(topic, partition)
  175. }
  176. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  177. }
  178. }
  179. return res
  180. }
  181. func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  182. partitions := mfr.messages[topic]
  183. if partitions == nil {
  184. return nil
  185. }
  186. messages := partitions[partition]
  187. if messages == nil {
  188. return nil
  189. }
  190. return messages[offset]
  191. }
  192. func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int {
  193. partitions := mfr.messages[topic]
  194. if partitions == nil {
  195. return 0
  196. }
  197. messages := partitions[partition]
  198. if messages == nil {
  199. return 0
  200. }
  201. return len(messages)
  202. }
  203. func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  204. partitions := mfr.highWaterMarks[topic]
  205. if partitions == nil {
  206. return 0
  207. }
  208. return partitions[partition]
  209. }