mockresponses_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package sarama
  2. import (
  3. "testing"
  4. )
  5. // MockResponse is a response builder interface it defines one method that
  6. // allows generating a response based on a request body.
  7. type MockResponse interface {
  8. For(reqBody decoder) (res encoder)
  9. }
  10. type mockWrapper struct {
  11. res encoder
  12. }
  13. func (mw *mockWrapper) For(reqBody decoder) (res encoder) {
  14. return mw.res
  15. }
  16. func newMockWrapper(res encoder) *mockWrapper {
  17. return &mockWrapper{res: res}
  18. }
  19. // mockMetadataResponse is a `MetadataResponse` builder.
  20. type mockMetadataResponse struct {
  21. leaders map[string]map[int32]int32
  22. brokers map[string]int32
  23. t *testing.T
  24. }
  25. func newMockMetadataResponse(t *testing.T) *mockMetadataResponse {
  26. return &mockMetadataResponse{
  27. leaders: make(map[string]map[int32]int32),
  28. brokers: make(map[string]int32),
  29. t: t,
  30. }
  31. }
  32. func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *mockMetadataResponse {
  33. partitions := mmr.leaders[topic]
  34. if partitions == nil {
  35. partitions = make(map[int32]int32)
  36. mmr.leaders[topic] = partitions
  37. }
  38. partitions[partition] = brokerID
  39. return mmr
  40. }
  41. func (mmr *mockMetadataResponse) SetBroker(addr string, brokerID int32) *mockMetadataResponse {
  42. mmr.brokers[addr] = brokerID
  43. return mmr
  44. }
  45. func (mor *mockMetadataResponse) For(reqBody decoder) encoder {
  46. metadataRequest := reqBody.(*MetadataRequest)
  47. metadataResponse := &MetadataResponse{}
  48. for addr, brokerID := range mor.brokers {
  49. metadataResponse.AddBroker(addr, brokerID)
  50. }
  51. if len(metadataRequest.Topics) == 0 {
  52. for topic, partitions := range mor.leaders {
  53. for partition, brokerID := range partitions {
  54. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  55. }
  56. }
  57. return metadataResponse
  58. }
  59. for _, topic := range metadataRequest.Topics {
  60. for partition, brokerID := range mor.leaders[topic] {
  61. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  62. }
  63. }
  64. return metadataResponse
  65. }
  66. // mockOffsetResponse is an `OffsetResponse` builder.
  67. type mockOffsetResponse struct {
  68. offsets map[string]map[int32]map[int64]int64
  69. t *testing.T
  70. }
  71. func newMockOffsetResponse(t *testing.T) *mockOffsetResponse {
  72. return &mockOffsetResponse{
  73. offsets: make(map[string]map[int32]map[int64]int64),
  74. t: t,
  75. }
  76. }
  77. func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *mockOffsetResponse {
  78. partitions := mor.offsets[topic]
  79. if partitions == nil {
  80. partitions = make(map[int32]map[int64]int64)
  81. mor.offsets[topic] = partitions
  82. }
  83. times := partitions[partition]
  84. if times == nil {
  85. times = make(map[int64]int64)
  86. partitions[partition] = times
  87. }
  88. times[time] = offset
  89. return mor
  90. }
  91. func (mor *mockOffsetResponse) For(reqBody decoder) encoder {
  92. offsetRequest := reqBody.(*OffsetRequest)
  93. offsetResponse := &OffsetResponse{}
  94. for topic, partitions := range offsetRequest.Blocks {
  95. for partition, block := range partitions {
  96. offset := mor.getOffset(topic, partition, block.Time)
  97. offsetResponse.AddTopicPartition(topic, partition, offset)
  98. }
  99. }
  100. return offsetResponse
  101. }
  102. func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  103. partitions := mor.offsets[topic]
  104. if partitions == nil {
  105. mor.t.Errorf("missing topic: %s", topic)
  106. }
  107. times := partitions[partition]
  108. if times == nil {
  109. mor.t.Errorf("missing partition: %d", partition)
  110. }
  111. offset, ok := times[time]
  112. if !ok {
  113. mor.t.Errorf("missing time: %d", time)
  114. }
  115. return offset
  116. }
  117. // mockFetchResponse is a `FetchResponse` builder.
  118. type mockFetchResponse struct {
  119. messages map[string]map[int32]map[int64]Encoder
  120. highWaterMarks map[string]map[int32]int64
  121. t *testing.T
  122. batchSize int
  123. }
  124. func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse {
  125. return &mockFetchResponse{
  126. messages: make(map[string]map[int32]map[int64]Encoder),
  127. highWaterMarks: make(map[string]map[int32]int64),
  128. t: t,
  129. batchSize: batchSize,
  130. }
  131. }
  132. func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *mockFetchResponse {
  133. partitions := mfr.messages[topic]
  134. if partitions == nil {
  135. partitions = make(map[int32]map[int64]Encoder)
  136. mfr.messages[topic] = partitions
  137. }
  138. messages := partitions[partition]
  139. if messages == nil {
  140. messages = make(map[int64]Encoder)
  141. partitions[partition] = messages
  142. }
  143. messages[offset] = msg
  144. return mfr
  145. }
  146. func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *mockFetchResponse {
  147. partitions := mfr.highWaterMarks[topic]
  148. if partitions == nil {
  149. partitions = make(map[int32]int64)
  150. mfr.highWaterMarks[topic] = partitions
  151. }
  152. partitions[partition] = offset
  153. return mfr
  154. }
  155. func (mfr *mockFetchResponse) For(reqBody decoder) encoder {
  156. fetchRequest := reqBody.(*FetchRequest)
  157. res := &FetchResponse{}
  158. for topic, partitions := range fetchRequest.Blocks {
  159. for partition, block := range partitions {
  160. initialOffset := block.FetchOffset
  161. offset := initialOffset
  162. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  163. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  164. msg := mfr.getMessage(topic, partition, offset)
  165. if msg != nil {
  166. res.AddMessage(topic, partition, nil, msg, offset)
  167. i++
  168. }
  169. offset++
  170. }
  171. fb := res.GetBlock(topic, partition)
  172. if fb == nil {
  173. res.AddError(topic, partition, ErrNoError)
  174. fb = res.GetBlock(topic, partition)
  175. }
  176. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  177. }
  178. }
  179. return res
  180. }
  181. func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  182. partitions := mfr.messages[topic]
  183. if partitions == nil {
  184. return nil
  185. }
  186. messages := partitions[partition]
  187. if messages == nil {
  188. return nil
  189. }
  190. return messages[offset]
  191. }
  192. func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int {
  193. partitions := mfr.messages[topic]
  194. if partitions == nil {
  195. return 0
  196. }
  197. messages := partitions[partition]
  198. if messages == nil {
  199. return 0
  200. }
  201. return len(messages)
  202. }
  203. func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  204. partitions := mfr.highWaterMarks[topic]
  205. if partitions == nil {
  206. return 0
  207. }
  208. return partitions[partition]
  209. }
  210. // mockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  211. type mockConsumerMetadataResponse struct {
  212. coordinators map[string]interface{}
  213. t *testing.T
  214. }
  215. func newMockConsumerMetadataResponse(t *testing.T) *mockConsumerMetadataResponse {
  216. return &mockConsumerMetadataResponse{
  217. coordinators: make(map[string]interface{}),
  218. t: t,
  219. }
  220. }
  221. func (mr *mockConsumerMetadataResponse) SetCoordinator(group string, broker *mockBroker) *mockConsumerMetadataResponse {
  222. mr.coordinators[group] = broker
  223. return mr
  224. }
  225. func (mr *mockConsumerMetadataResponse) SetError(group string, kerror KError) *mockConsumerMetadataResponse {
  226. mr.coordinators[group] = kerror
  227. return mr
  228. }
  229. func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder {
  230. req := reqBody.(*ConsumerMetadataRequest)
  231. group := req.ConsumerGroup
  232. res := &ConsumerMetadataResponse{}
  233. v := mr.coordinators[group]
  234. switch v := v.(type) {
  235. case *mockBroker:
  236. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  237. case KError:
  238. res.Err = v
  239. }
  240. return res
  241. }
  242. // mockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  243. type mockOffsetCommitResponse struct {
  244. errors map[string]map[string]map[int32]KError
  245. t *testing.T
  246. }
  247. func newMockOffsetCommitResponse(t *testing.T) *mockOffsetCommitResponse {
  248. return &mockOffsetCommitResponse{t: t}
  249. }
  250. func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *mockOffsetCommitResponse {
  251. if mr.errors == nil {
  252. mr.errors = make(map[string]map[string]map[int32]KError)
  253. }
  254. topics := mr.errors[group]
  255. if topics == nil {
  256. topics = make(map[string]map[int32]KError)
  257. mr.errors[group] = topics
  258. }
  259. partitions := topics[topic]
  260. if partitions == nil {
  261. partitions = make(map[int32]KError)
  262. topics[topic] = partitions
  263. }
  264. partitions[partition] = kerror
  265. return mr
  266. }
  267. func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder {
  268. req := reqBody.(*OffsetCommitRequest)
  269. group := req.ConsumerGroup
  270. res := &OffsetCommitResponse{}
  271. for topic, partitions := range req.blocks {
  272. for partition := range partitions {
  273. res.AddError(topic, partition, mr.getError(group, topic, partition))
  274. }
  275. }
  276. return res
  277. }
  278. func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  279. topics := mr.errors[group]
  280. if topics == nil {
  281. return ErrNoError
  282. }
  283. partitions := topics[topic]
  284. if partitions == nil {
  285. return ErrNoError
  286. }
  287. kerror, ok := partitions[partition]
  288. if !ok {
  289. return ErrNoError
  290. }
  291. return kerror
  292. }
  293. // mockProduceResponse is a `ProduceResponse` builder.
  294. type mockProduceResponse struct {
  295. errors map[string]map[int32]KError
  296. t *testing.T
  297. }
  298. func newMockProduceResponse(t *testing.T) *mockProduceResponse {
  299. return &mockProduceResponse{t: t}
  300. }
  301. func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse {
  302. if mr.errors == nil {
  303. mr.errors = make(map[string]map[int32]KError)
  304. }
  305. partitions := mr.errors[topic]
  306. if partitions == nil {
  307. partitions = make(map[int32]KError)
  308. mr.errors[topic] = partitions
  309. }
  310. partitions[partition] = kerror
  311. return mr
  312. }
  313. func (mr *mockProduceResponse) For(reqBody decoder) encoder {
  314. req := reqBody.(*ProduceRequest)
  315. res := &ProduceResponse{}
  316. for topic, partitions := range req.MsgSets {
  317. for partition := range partitions {
  318. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  319. }
  320. }
  321. return res
  322. }
  323. func (mr *mockProduceResponse) getError(topic string, partition int32) KError {
  324. partitions := mr.errors[topic]
  325. if partitions == nil {
  326. return ErrNoError
  327. }
  328. kerror, ok := partitions[partition]
  329. if !ok {
  330. return ErrNoError
  331. }
  332. return kerror
  333. }
  334. // mockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  335. type mockOffsetFetchResponse struct {
  336. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  337. t *testing.T
  338. }
  339. func newMockOffsetFetchResponse(t *testing.T) *mockOffsetFetchResponse {
  340. return &mockOffsetFetchResponse{t: t}
  341. }
  342. func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *mockOffsetFetchResponse {
  343. if mr.offsets == nil {
  344. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  345. }
  346. topics := mr.offsets[group]
  347. if topics == nil {
  348. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  349. mr.offsets[group] = topics
  350. }
  351. partitions := topics[topic]
  352. if partitions == nil {
  353. partitions = make(map[int32]*OffsetFetchResponseBlock)
  354. topics[topic] = partitions
  355. }
  356. partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
  357. return mr
  358. }
  359. func (mr *mockOffsetFetchResponse) For(reqBody decoder) encoder {
  360. req := reqBody.(*OffsetFetchRequest)
  361. group := req.ConsumerGroup
  362. res := &OffsetFetchResponse{}
  363. for topic, partitions := range mr.offsets[group] {
  364. for partition, block := range partitions {
  365. res.AddBlock(topic, partition, block)
  366. }
  367. }
  368. return res
  369. }