mockresponses.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. package sarama
  2. import (
  3. "fmt"
  4. )
  5. // TestReporter has methods matching go's testing.T to avoid importing
  6. // `testing` in the main part of the library.
  7. type TestReporter interface {
  8. Error(...interface{})
  9. Errorf(string, ...interface{})
  10. Fatal(...interface{})
  11. Fatalf(string, ...interface{})
  12. }
  13. // MockResponse is a response builder interface it defines one method that
  14. // allows generating a response based on a request body. MockResponses are used
  15. // to program behavior of MockBroker in tests.
  16. type MockResponse interface {
  17. For(reqBody versionedDecoder) (res encoder)
  18. }
  19. // MockWrapper is a mock response builder that returns a particular concrete
  20. // response regardless of the actual request passed to the `For` method.
  21. type MockWrapper struct {
  22. res encoder
  23. }
  24. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
  25. return mw.res
  26. }
  27. func NewMockWrapper(res encoder) *MockWrapper {
  28. return &MockWrapper{res: res}
  29. }
  30. // MockSequence is a mock response builder that is created from a sequence of
  31. // concrete responses. Every time when a `MockBroker` calls its `For` method
  32. // the next response from the sequence is returned. When the end of the
  33. // sequence is reached the last element from the sequence is returned.
  34. type MockSequence struct {
  35. responses []MockResponse
  36. }
  37. func NewMockSequence(responses ...interface{}) *MockSequence {
  38. ms := &MockSequence{}
  39. ms.responses = make([]MockResponse, len(responses))
  40. for i, res := range responses {
  41. switch res := res.(type) {
  42. case MockResponse:
  43. ms.responses[i] = res
  44. case encoder:
  45. ms.responses[i] = NewMockWrapper(res)
  46. default:
  47. panic(fmt.Sprintf("Unexpected response type: %T", res))
  48. }
  49. }
  50. return ms
  51. }
  52. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
  53. res = mc.responses[0].For(reqBody)
  54. if len(mc.responses) > 1 {
  55. mc.responses = mc.responses[1:]
  56. }
  57. return res
  58. }
  59. // MockMetadataResponse is a `MetadataResponse` builder.
  60. type MockMetadataResponse struct {
  61. leaders map[string]map[int32]int32
  62. brokers map[string]int32
  63. t TestReporter
  64. }
  65. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  66. return &MockMetadataResponse{
  67. leaders: make(map[string]map[int32]int32),
  68. brokers: make(map[string]int32),
  69. t: t,
  70. }
  71. }
  72. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  73. partitions := mmr.leaders[topic]
  74. if partitions == nil {
  75. partitions = make(map[int32]int32)
  76. mmr.leaders[topic] = partitions
  77. }
  78. partitions[partition] = brokerID
  79. return mmr
  80. }
  81. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  82. mmr.brokers[addr] = brokerID
  83. return mmr
  84. }
  85. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
  86. metadataRequest := reqBody.(*MetadataRequest)
  87. metadataResponse := &MetadataResponse{}
  88. for addr, brokerID := range mmr.brokers {
  89. metadataResponse.AddBroker(addr, brokerID)
  90. }
  91. if len(metadataRequest.Topics) == 0 {
  92. for topic, partitions := range mmr.leaders {
  93. for partition, brokerID := range partitions {
  94. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  95. }
  96. }
  97. return metadataResponse
  98. }
  99. for _, topic := range metadataRequest.Topics {
  100. for partition, brokerID := range mmr.leaders[topic] {
  101. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  102. }
  103. }
  104. return metadataResponse
  105. }
  106. // MockOffsetResponse is an `OffsetResponse` builder.
  107. type MockOffsetResponse struct {
  108. offsets map[string]map[int32]map[int64]int64
  109. t TestReporter
  110. version int16
  111. }
  112. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  113. return &MockOffsetResponse{
  114. offsets: make(map[string]map[int32]map[int64]int64),
  115. t: t,
  116. }
  117. }
  118. func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
  119. mor.version = version
  120. return mor
  121. }
  122. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  123. partitions := mor.offsets[topic]
  124. if partitions == nil {
  125. partitions = make(map[int32]map[int64]int64)
  126. mor.offsets[topic] = partitions
  127. }
  128. times := partitions[partition]
  129. if times == nil {
  130. times = make(map[int64]int64)
  131. partitions[partition] = times
  132. }
  133. times[time] = offset
  134. return mor
  135. }
  136. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
  137. offsetRequest := reqBody.(*OffsetRequest)
  138. offsetResponse := &OffsetResponse{Version: mor.version}
  139. for topic, partitions := range offsetRequest.blocks {
  140. for partition, block := range partitions {
  141. offset := mor.getOffset(topic, partition, block.time)
  142. offsetResponse.AddTopicPartition(topic, partition, offset)
  143. }
  144. }
  145. return offsetResponse
  146. }
  147. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  148. partitions := mor.offsets[topic]
  149. if partitions == nil {
  150. mor.t.Errorf("missing topic: %s", topic)
  151. }
  152. times := partitions[partition]
  153. if times == nil {
  154. mor.t.Errorf("missing partition: %d", partition)
  155. }
  156. offset, ok := times[time]
  157. if !ok {
  158. mor.t.Errorf("missing time: %d", time)
  159. }
  160. return offset
  161. }
  162. // MockFetchResponse is a `FetchResponse` builder.
  163. type MockFetchResponse struct {
  164. messages map[string]map[int32]map[int64]Encoder
  165. highWaterMarks map[string]map[int32]int64
  166. t TestReporter
  167. batchSize int
  168. version int16
  169. }
  170. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  171. return &MockFetchResponse{
  172. messages: make(map[string]map[int32]map[int64]Encoder),
  173. highWaterMarks: make(map[string]map[int32]int64),
  174. t: t,
  175. batchSize: batchSize,
  176. }
  177. }
  178. func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
  179. mfr.version = version
  180. return mfr
  181. }
  182. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  183. partitions := mfr.messages[topic]
  184. if partitions == nil {
  185. partitions = make(map[int32]map[int64]Encoder)
  186. mfr.messages[topic] = partitions
  187. }
  188. messages := partitions[partition]
  189. if messages == nil {
  190. messages = make(map[int64]Encoder)
  191. partitions[partition] = messages
  192. }
  193. messages[offset] = msg
  194. return mfr
  195. }
  196. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  197. partitions := mfr.highWaterMarks[topic]
  198. if partitions == nil {
  199. partitions = make(map[int32]int64)
  200. mfr.highWaterMarks[topic] = partitions
  201. }
  202. partitions[partition] = offset
  203. return mfr
  204. }
  205. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
  206. fetchRequest := reqBody.(*FetchRequest)
  207. res := &FetchResponse{
  208. Version: mfr.version,
  209. }
  210. for topic, partitions := range fetchRequest.blocks {
  211. for partition, block := range partitions {
  212. initialOffset := block.fetchOffset
  213. offset := initialOffset
  214. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  215. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  216. msg := mfr.getMessage(topic, partition, offset)
  217. if msg != nil {
  218. res.AddMessage(topic, partition, nil, msg, offset)
  219. i++
  220. }
  221. offset++
  222. }
  223. fb := res.GetBlock(topic, partition)
  224. if fb == nil {
  225. res.AddError(topic, partition, ErrNoError)
  226. fb = res.GetBlock(topic, partition)
  227. }
  228. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  229. }
  230. }
  231. return res
  232. }
  233. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  234. partitions := mfr.messages[topic]
  235. if partitions == nil {
  236. return nil
  237. }
  238. messages := partitions[partition]
  239. if messages == nil {
  240. return nil
  241. }
  242. return messages[offset]
  243. }
  244. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  245. partitions := mfr.messages[topic]
  246. if partitions == nil {
  247. return 0
  248. }
  249. messages := partitions[partition]
  250. if messages == nil {
  251. return 0
  252. }
  253. return len(messages)
  254. }
  255. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  256. partitions := mfr.highWaterMarks[topic]
  257. if partitions == nil {
  258. return 0
  259. }
  260. return partitions[partition]
  261. }
  262. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  263. type MockConsumerMetadataResponse struct {
  264. coordinators map[string]interface{}
  265. t TestReporter
  266. }
  267. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  268. return &MockConsumerMetadataResponse{
  269. coordinators: make(map[string]interface{}),
  270. t: t,
  271. }
  272. }
  273. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  274. mr.coordinators[group] = broker
  275. return mr
  276. }
  277. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  278. mr.coordinators[group] = kerror
  279. return mr
  280. }
  281. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
  282. req := reqBody.(*ConsumerMetadataRequest)
  283. group := req.ConsumerGroup
  284. res := &ConsumerMetadataResponse{}
  285. v := mr.coordinators[group]
  286. switch v := v.(type) {
  287. case *MockBroker:
  288. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  289. case KError:
  290. res.Err = v
  291. }
  292. return res
  293. }
  294. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  295. type MockFindCoordinatorResponse struct {
  296. groupCoordinators map[string]interface{}
  297. transCoordinators map[string]interface{}
  298. t TestReporter
  299. }
  300. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  301. return &MockFindCoordinatorResponse{
  302. groupCoordinators: make(map[string]interface{}),
  303. transCoordinators: make(map[string]interface{}),
  304. t: t,
  305. }
  306. }
  307. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  308. switch coordinatorType {
  309. case CoordinatorGroup:
  310. mr.groupCoordinators[group] = broker
  311. case CoordinatorTransaction:
  312. mr.transCoordinators[group] = broker
  313. }
  314. return mr
  315. }
  316. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  317. switch coordinatorType {
  318. case CoordinatorGroup:
  319. mr.groupCoordinators[group] = kerror
  320. case CoordinatorTransaction:
  321. mr.transCoordinators[group] = kerror
  322. }
  323. return mr
  324. }
  325. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
  326. req := reqBody.(*FindCoordinatorRequest)
  327. res := &FindCoordinatorResponse{}
  328. var v interface{}
  329. switch req.CoordinatorType {
  330. case CoordinatorGroup:
  331. v = mr.groupCoordinators[req.CoordinatorKey]
  332. case CoordinatorTransaction:
  333. v = mr.transCoordinators[req.CoordinatorKey]
  334. }
  335. switch v := v.(type) {
  336. case *MockBroker:
  337. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  338. case KError:
  339. res.Err = v
  340. }
  341. return res
  342. }
  343. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  344. type MockOffsetCommitResponse struct {
  345. errors map[string]map[string]map[int32]KError
  346. t TestReporter
  347. }
  348. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  349. return &MockOffsetCommitResponse{t: t}
  350. }
  351. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  352. if mr.errors == nil {
  353. mr.errors = make(map[string]map[string]map[int32]KError)
  354. }
  355. topics := mr.errors[group]
  356. if topics == nil {
  357. topics = make(map[string]map[int32]KError)
  358. mr.errors[group] = topics
  359. }
  360. partitions := topics[topic]
  361. if partitions == nil {
  362. partitions = make(map[int32]KError)
  363. topics[topic] = partitions
  364. }
  365. partitions[partition] = kerror
  366. return mr
  367. }
  368. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
  369. req := reqBody.(*OffsetCommitRequest)
  370. group := req.ConsumerGroup
  371. res := &OffsetCommitResponse{}
  372. for topic, partitions := range req.blocks {
  373. for partition := range partitions {
  374. res.AddError(topic, partition, mr.getError(group, topic, partition))
  375. }
  376. }
  377. return res
  378. }
  379. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  380. topics := mr.errors[group]
  381. if topics == nil {
  382. return ErrNoError
  383. }
  384. partitions := topics[topic]
  385. if partitions == nil {
  386. return ErrNoError
  387. }
  388. kerror, ok := partitions[partition]
  389. if !ok {
  390. return ErrNoError
  391. }
  392. return kerror
  393. }
  394. // MockProduceResponse is a `ProduceResponse` builder.
  395. type MockProduceResponse struct {
  396. version int16
  397. errors map[string]map[int32]KError
  398. t TestReporter
  399. }
  400. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  401. return &MockProduceResponse{t: t}
  402. }
  403. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  404. mr.version = version
  405. return mr
  406. }
  407. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  408. if mr.errors == nil {
  409. mr.errors = make(map[string]map[int32]KError)
  410. }
  411. partitions := mr.errors[topic]
  412. if partitions == nil {
  413. partitions = make(map[int32]KError)
  414. mr.errors[topic] = partitions
  415. }
  416. partitions[partition] = kerror
  417. return mr
  418. }
  419. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
  420. req := reqBody.(*ProduceRequest)
  421. res := &ProduceResponse{
  422. Version: mr.version,
  423. }
  424. for topic, partitions := range req.records {
  425. for partition := range partitions {
  426. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  427. }
  428. }
  429. return res
  430. }
  431. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  432. partitions := mr.errors[topic]
  433. if partitions == nil {
  434. return ErrNoError
  435. }
  436. kerror, ok := partitions[partition]
  437. if !ok {
  438. return ErrNoError
  439. }
  440. return kerror
  441. }
  442. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  443. type MockOffsetFetchResponse struct {
  444. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  445. t TestReporter
  446. }
  447. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  448. return &MockOffsetFetchResponse{t: t}
  449. }
  450. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  451. if mr.offsets == nil {
  452. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  453. }
  454. topics := mr.offsets[group]
  455. if topics == nil {
  456. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  457. mr.offsets[group] = topics
  458. }
  459. partitions := topics[topic]
  460. if partitions == nil {
  461. partitions = make(map[int32]*OffsetFetchResponseBlock)
  462. topics[topic] = partitions
  463. }
  464. partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
  465. return mr
  466. }
  467. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
  468. req := reqBody.(*OffsetFetchRequest)
  469. group := req.ConsumerGroup
  470. res := &OffsetFetchResponse{}
  471. for topic, partitions := range mr.offsets[group] {
  472. for partition, block := range partitions {
  473. res.AddBlock(topic, partition, block)
  474. }
  475. }
  476. return res
  477. }