mockresponses_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. )
  6. // MockResponse is a response builder interface it defines one method that
  7. // allows generating a response based on a request body.
  8. type MockResponse interface {
  9. For(reqBody decoder) (res encoder)
  10. }
  11. // mockWrapper is a mock response builder that returns a particular concrete
  12. // response regardless of the actual request passed to the `For` method.
  13. type mockWrapper struct {
  14. res encoder
  15. }
  16. func (mw *mockWrapper) For(reqBody decoder) (res encoder) {
  17. return mw.res
  18. }
  19. func newMockWrapper(res encoder) *mockWrapper {
  20. return &mockWrapper{res: res}
  21. }
  22. // mockSequence is a mock response builder that is created from a sequence of
  23. // concrete responses. Every time when a `MockBroker` calls its `For` method
  24. // the next response from the sequence is returned. When the end of the
  25. // sequence is reached the last element from the sequence is returned.
  26. type mockSequence struct {
  27. responses []MockResponse
  28. }
  29. func newMockSequence(responses ...interface{}) *mockSequence {
  30. ms := &mockSequence{}
  31. ms.responses = make([]MockResponse, len(responses))
  32. for i, res := range responses {
  33. switch res := res.(type) {
  34. case MockResponse:
  35. ms.responses[i] = res
  36. case encoder:
  37. ms.responses[i] = newMockWrapper(res)
  38. default:
  39. panic(fmt.Sprintf("Unexpected response type: %T", res))
  40. }
  41. }
  42. return ms
  43. }
  44. func (mc *mockSequence) For(reqBody decoder) (res encoder) {
  45. res = mc.responses[0].For(reqBody)
  46. if len(mc.responses) > 1 {
  47. mc.responses = mc.responses[1:]
  48. }
  49. return res
  50. }
  51. // mockMetadataResponse is a `MetadataResponse` builder.
  52. type mockMetadataResponse struct {
  53. leaders map[string]map[int32]int32
  54. brokers map[string]int32
  55. t *testing.T
  56. }
  57. func newMockMetadataResponse(t *testing.T) *mockMetadataResponse {
  58. return &mockMetadataResponse{
  59. leaders: make(map[string]map[int32]int32),
  60. brokers: make(map[string]int32),
  61. t: t,
  62. }
  63. }
  64. func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *mockMetadataResponse {
  65. partitions := mmr.leaders[topic]
  66. if partitions == nil {
  67. partitions = make(map[int32]int32)
  68. mmr.leaders[topic] = partitions
  69. }
  70. partitions[partition] = brokerID
  71. return mmr
  72. }
  73. func (mmr *mockMetadataResponse) SetBroker(addr string, brokerID int32) *mockMetadataResponse {
  74. mmr.brokers[addr] = brokerID
  75. return mmr
  76. }
  77. func (mmr *mockMetadataResponse) For(reqBody decoder) encoder {
  78. metadataRequest := reqBody.(*MetadataRequest)
  79. metadataResponse := &MetadataResponse{}
  80. for addr, brokerID := range mmr.brokers {
  81. metadataResponse.AddBroker(addr, brokerID)
  82. }
  83. if len(metadataRequest.Topics) == 0 {
  84. for topic, partitions := range mmr.leaders {
  85. for partition, brokerID := range partitions {
  86. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  87. }
  88. }
  89. return metadataResponse
  90. }
  91. for _, topic := range metadataRequest.Topics {
  92. for partition, brokerID := range mmr.leaders[topic] {
  93. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  94. }
  95. }
  96. return metadataResponse
  97. }
  98. // mockOffsetResponse is an `OffsetResponse` builder.
  99. type mockOffsetResponse struct {
  100. offsets map[string]map[int32]map[int64]int64
  101. t *testing.T
  102. }
  103. func newMockOffsetResponse(t *testing.T) *mockOffsetResponse {
  104. return &mockOffsetResponse{
  105. offsets: make(map[string]map[int32]map[int64]int64),
  106. t: t,
  107. }
  108. }
  109. func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *mockOffsetResponse {
  110. partitions := mor.offsets[topic]
  111. if partitions == nil {
  112. partitions = make(map[int32]map[int64]int64)
  113. mor.offsets[topic] = partitions
  114. }
  115. times := partitions[partition]
  116. if times == nil {
  117. times = make(map[int64]int64)
  118. partitions[partition] = times
  119. }
  120. times[time] = offset
  121. return mor
  122. }
  123. func (mor *mockOffsetResponse) For(reqBody decoder) encoder {
  124. offsetRequest := reqBody.(*OffsetRequest)
  125. offsetResponse := &OffsetResponse{}
  126. for topic, partitions := range offsetRequest.blocks {
  127. for partition, block := range partitions {
  128. offset := mor.getOffset(topic, partition, block.time)
  129. offsetResponse.AddTopicPartition(topic, partition, offset)
  130. }
  131. }
  132. return offsetResponse
  133. }
  134. func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  135. partitions := mor.offsets[topic]
  136. if partitions == nil {
  137. mor.t.Errorf("missing topic: %s", topic)
  138. }
  139. times := partitions[partition]
  140. if times == nil {
  141. mor.t.Errorf("missing partition: %d", partition)
  142. }
  143. offset, ok := times[time]
  144. if !ok {
  145. mor.t.Errorf("missing time: %d", time)
  146. }
  147. return offset
  148. }
  149. // mockFetchResponse is a `FetchResponse` builder.
  150. type mockFetchResponse struct {
  151. messages map[string]map[int32]map[int64]Encoder
  152. highWaterMarks map[string]map[int32]int64
  153. t *testing.T
  154. batchSize int
  155. }
  156. func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse {
  157. return &mockFetchResponse{
  158. messages: make(map[string]map[int32]map[int64]Encoder),
  159. highWaterMarks: make(map[string]map[int32]int64),
  160. t: t,
  161. batchSize: batchSize,
  162. }
  163. }
  164. func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *mockFetchResponse {
  165. partitions := mfr.messages[topic]
  166. if partitions == nil {
  167. partitions = make(map[int32]map[int64]Encoder)
  168. mfr.messages[topic] = partitions
  169. }
  170. messages := partitions[partition]
  171. if messages == nil {
  172. messages = make(map[int64]Encoder)
  173. partitions[partition] = messages
  174. }
  175. messages[offset] = msg
  176. return mfr
  177. }
  178. func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *mockFetchResponse {
  179. partitions := mfr.highWaterMarks[topic]
  180. if partitions == nil {
  181. partitions = make(map[int32]int64)
  182. mfr.highWaterMarks[topic] = partitions
  183. }
  184. partitions[partition] = offset
  185. return mfr
  186. }
  187. func (mfr *mockFetchResponse) For(reqBody decoder) encoder {
  188. fetchRequest := reqBody.(*FetchRequest)
  189. res := &FetchResponse{}
  190. for topic, partitions := range fetchRequest.blocks {
  191. for partition, block := range partitions {
  192. initialOffset := block.fetchOffset
  193. offset := initialOffset
  194. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  195. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  196. msg := mfr.getMessage(topic, partition, offset)
  197. if msg != nil {
  198. res.AddMessage(topic, partition, nil, msg, offset)
  199. i++
  200. }
  201. offset++
  202. }
  203. fb := res.GetBlock(topic, partition)
  204. if fb == nil {
  205. res.AddError(topic, partition, ErrNoError)
  206. fb = res.GetBlock(topic, partition)
  207. }
  208. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  209. }
  210. }
  211. return res
  212. }
  213. func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  214. partitions := mfr.messages[topic]
  215. if partitions == nil {
  216. return nil
  217. }
  218. messages := partitions[partition]
  219. if messages == nil {
  220. return nil
  221. }
  222. return messages[offset]
  223. }
  224. func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int {
  225. partitions := mfr.messages[topic]
  226. if partitions == nil {
  227. return 0
  228. }
  229. messages := partitions[partition]
  230. if messages == nil {
  231. return 0
  232. }
  233. return len(messages)
  234. }
  235. func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  236. partitions := mfr.highWaterMarks[topic]
  237. if partitions == nil {
  238. return 0
  239. }
  240. return partitions[partition]
  241. }
  242. // mockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  243. type mockConsumerMetadataResponse struct {
  244. coordinators map[string]interface{}
  245. t *testing.T
  246. }
  247. func newMockConsumerMetadataResponse(t *testing.T) *mockConsumerMetadataResponse {
  248. return &mockConsumerMetadataResponse{
  249. coordinators: make(map[string]interface{}),
  250. t: t,
  251. }
  252. }
  253. func (mr *mockConsumerMetadataResponse) SetCoordinator(group string, broker *mockBroker) *mockConsumerMetadataResponse {
  254. mr.coordinators[group] = broker
  255. return mr
  256. }
  257. func (mr *mockConsumerMetadataResponse) SetError(group string, kerror KError) *mockConsumerMetadataResponse {
  258. mr.coordinators[group] = kerror
  259. return mr
  260. }
  261. func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder {
  262. req := reqBody.(*ConsumerMetadataRequest)
  263. group := req.ConsumerGroup
  264. res := &ConsumerMetadataResponse{}
  265. v := mr.coordinators[group]
  266. switch v := v.(type) {
  267. case *mockBroker:
  268. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  269. case KError:
  270. res.Err = v
  271. }
  272. return res
  273. }
  274. // mockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  275. type mockOffsetCommitResponse struct {
  276. errors map[string]map[string]map[int32]KError
  277. t *testing.T
  278. }
  279. func newMockOffsetCommitResponse(t *testing.T) *mockOffsetCommitResponse {
  280. return &mockOffsetCommitResponse{t: t}
  281. }
  282. func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *mockOffsetCommitResponse {
  283. if mr.errors == nil {
  284. mr.errors = make(map[string]map[string]map[int32]KError)
  285. }
  286. topics := mr.errors[group]
  287. if topics == nil {
  288. topics = make(map[string]map[int32]KError)
  289. mr.errors[group] = topics
  290. }
  291. partitions := topics[topic]
  292. if partitions == nil {
  293. partitions = make(map[int32]KError)
  294. topics[topic] = partitions
  295. }
  296. partitions[partition] = kerror
  297. return mr
  298. }
  299. func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder {
  300. req := reqBody.(*OffsetCommitRequest)
  301. group := req.ConsumerGroup
  302. res := &OffsetCommitResponse{}
  303. for topic, partitions := range req.blocks {
  304. for partition := range partitions {
  305. res.AddError(topic, partition, mr.getError(group, topic, partition))
  306. }
  307. }
  308. return res
  309. }
  310. func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  311. topics := mr.errors[group]
  312. if topics == nil {
  313. return ErrNoError
  314. }
  315. partitions := topics[topic]
  316. if partitions == nil {
  317. return ErrNoError
  318. }
  319. kerror, ok := partitions[partition]
  320. if !ok {
  321. return ErrNoError
  322. }
  323. return kerror
  324. }
  325. // mockProduceResponse is a `ProduceResponse` builder.
  326. type mockProduceResponse struct {
  327. errors map[string]map[int32]KError
  328. t *testing.T
  329. }
  330. func newMockProduceResponse(t *testing.T) *mockProduceResponse {
  331. return &mockProduceResponse{t: t}
  332. }
  333. func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse {
  334. if mr.errors == nil {
  335. mr.errors = make(map[string]map[int32]KError)
  336. }
  337. partitions := mr.errors[topic]
  338. if partitions == nil {
  339. partitions = make(map[int32]KError)
  340. mr.errors[topic] = partitions
  341. }
  342. partitions[partition] = kerror
  343. return mr
  344. }
  345. func (mr *mockProduceResponse) For(reqBody decoder) encoder {
  346. req := reqBody.(*ProduceRequest)
  347. res := &ProduceResponse{}
  348. for topic, partitions := range req.msgSets {
  349. for partition := range partitions {
  350. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  351. }
  352. }
  353. return res
  354. }
  355. func (mr *mockProduceResponse) getError(topic string, partition int32) KError {
  356. partitions := mr.errors[topic]
  357. if partitions == nil {
  358. return ErrNoError
  359. }
  360. kerror, ok := partitions[partition]
  361. if !ok {
  362. return ErrNoError
  363. }
  364. return kerror
  365. }
  366. // mockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  367. type mockOffsetFetchResponse struct {
  368. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  369. t *testing.T
  370. }
  371. func newMockOffsetFetchResponse(t *testing.T) *mockOffsetFetchResponse {
  372. return &mockOffsetFetchResponse{t: t}
  373. }
  374. func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *mockOffsetFetchResponse {
  375. if mr.offsets == nil {
  376. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  377. }
  378. topics := mr.offsets[group]
  379. if topics == nil {
  380. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  381. mr.offsets[group] = topics
  382. }
  383. partitions := topics[topic]
  384. if partitions == nil {
  385. partitions = make(map[int32]*OffsetFetchResponseBlock)
  386. topics[topic] = partitions
  387. }
  388. partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
  389. return mr
  390. }
  391. func (mr *mockOffsetFetchResponse) For(reqBody decoder) encoder {
  392. req := reqBody.(*OffsetFetchRequest)
  393. group := req.ConsumerGroup
  394. res := &OffsetFetchResponse{}
  395. for topic, partitions := range mr.offsets[group] {
  396. for partition, block := range partitions {
  397. res.AddBlock(topic, partition, block)
  398. }
  399. }
  400. return res
  401. }