mockresponses.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. package sarama
  2. import (
  3. "fmt"
  4. )
  5. // TestReporter has methods matching go's testing.T to avoid importing
  6. // `testing` in the main part of the library.
  7. type TestReporter interface {
  8. Error(...interface{})
  9. Errorf(string, ...interface{})
  10. Fatal(...interface{})
  11. Fatalf(string, ...interface{})
  12. }
  13. // MockResponse is a response builder interface it defines one method that
  14. // allows generating a response based on a request body. MockResponses are used
  15. // to program behavior of MockBroker in tests.
  16. type MockResponse interface {
  17. For(reqBody versionedDecoder) (res encoder)
  18. }
  19. // MockWrapper is a mock response builder that returns a particular concrete
  20. // response regardless of the actual request passed to the `For` method.
  21. type MockWrapper struct {
  22. res encoder
  23. }
  24. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
  25. return mw.res
  26. }
  27. func NewMockWrapper(res encoder) *MockWrapper {
  28. return &MockWrapper{res: res}
  29. }
  30. // MockSequence is a mock response builder that is created from a sequence of
  31. // concrete responses. Every time when a `MockBroker` calls its `For` method
  32. // the next response from the sequence is returned. When the end of the
  33. // sequence is reached the last element from the sequence is returned.
  34. type MockSequence struct {
  35. responses []MockResponse
  36. }
  37. func NewMockSequence(responses ...interface{}) *MockSequence {
  38. ms := &MockSequence{}
  39. ms.responses = make([]MockResponse, len(responses))
  40. for i, res := range responses {
  41. switch res := res.(type) {
  42. case MockResponse:
  43. ms.responses[i] = res
  44. case encoder:
  45. ms.responses[i] = NewMockWrapper(res)
  46. default:
  47. panic(fmt.Sprintf("Unexpected response type: %T", res))
  48. }
  49. }
  50. return ms
  51. }
  52. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
  53. res = mc.responses[0].For(reqBody)
  54. if len(mc.responses) > 1 {
  55. mc.responses = mc.responses[1:]
  56. }
  57. return res
  58. }
  59. // MockMetadataResponse is a `MetadataResponse` builder.
  60. type MockMetadataResponse struct {
  61. controllerID int32
  62. leaders map[string]map[int32]int32
  63. brokers map[string]int32
  64. t TestReporter
  65. }
  66. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  67. return &MockMetadataResponse{
  68. leaders: make(map[string]map[int32]int32),
  69. brokers: make(map[string]int32),
  70. t: t,
  71. }
  72. }
  73. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  74. partitions := mmr.leaders[topic]
  75. if partitions == nil {
  76. partitions = make(map[int32]int32)
  77. mmr.leaders[topic] = partitions
  78. }
  79. partitions[partition] = brokerID
  80. return mmr
  81. }
  82. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  83. mmr.brokers[addr] = brokerID
  84. return mmr
  85. }
  86. func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
  87. mmr.controllerID = brokerID
  88. return mmr
  89. }
  90. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
  91. metadataRequest := reqBody.(*MetadataRequest)
  92. metadataResponse := &MetadataResponse{
  93. Version: metadataRequest.version(),
  94. ControllerID: mmr.controllerID,
  95. }
  96. for addr, brokerID := range mmr.brokers {
  97. metadataResponse.AddBroker(addr, brokerID)
  98. }
  99. if len(metadataRequest.Topics) == 0 {
  100. for topic, partitions := range mmr.leaders {
  101. for partition, brokerID := range partitions {
  102. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  103. }
  104. }
  105. return metadataResponse
  106. }
  107. for _, topic := range metadataRequest.Topics {
  108. for partition, brokerID := range mmr.leaders[topic] {
  109. metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
  110. }
  111. }
  112. return metadataResponse
  113. }
  114. // MockOffsetResponse is an `OffsetResponse` builder.
  115. type MockOffsetResponse struct {
  116. offsets map[string]map[int32]map[int64]int64
  117. t TestReporter
  118. version int16
  119. }
  120. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  121. return &MockOffsetResponse{
  122. offsets: make(map[string]map[int32]map[int64]int64),
  123. t: t,
  124. }
  125. }
  126. func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
  127. mor.version = version
  128. return mor
  129. }
  130. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  131. partitions := mor.offsets[topic]
  132. if partitions == nil {
  133. partitions = make(map[int32]map[int64]int64)
  134. mor.offsets[topic] = partitions
  135. }
  136. times := partitions[partition]
  137. if times == nil {
  138. times = make(map[int64]int64)
  139. partitions[partition] = times
  140. }
  141. times[time] = offset
  142. return mor
  143. }
  144. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
  145. offsetRequest := reqBody.(*OffsetRequest)
  146. offsetResponse := &OffsetResponse{Version: mor.version}
  147. for topic, partitions := range offsetRequest.blocks {
  148. for partition, block := range partitions {
  149. offset := mor.getOffset(topic, partition, block.time)
  150. offsetResponse.AddTopicPartition(topic, partition, offset)
  151. }
  152. }
  153. return offsetResponse
  154. }
  155. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  156. partitions := mor.offsets[topic]
  157. if partitions == nil {
  158. mor.t.Errorf("missing topic: %s", topic)
  159. }
  160. times := partitions[partition]
  161. if times == nil {
  162. mor.t.Errorf("missing partition: %d", partition)
  163. }
  164. offset, ok := times[time]
  165. if !ok {
  166. mor.t.Errorf("missing time: %d", time)
  167. }
  168. return offset
  169. }
  170. // MockFetchResponse is a `FetchResponse` builder.
  171. type MockFetchResponse struct {
  172. messages map[string]map[int32]map[int64]Encoder
  173. highWaterMarks map[string]map[int32]int64
  174. t TestReporter
  175. batchSize int
  176. version int16
  177. }
  178. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  179. return &MockFetchResponse{
  180. messages: make(map[string]map[int32]map[int64]Encoder),
  181. highWaterMarks: make(map[string]map[int32]int64),
  182. t: t,
  183. batchSize: batchSize,
  184. }
  185. }
  186. func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
  187. mfr.version = version
  188. return mfr
  189. }
  190. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  191. partitions := mfr.messages[topic]
  192. if partitions == nil {
  193. partitions = make(map[int32]map[int64]Encoder)
  194. mfr.messages[topic] = partitions
  195. }
  196. messages := partitions[partition]
  197. if messages == nil {
  198. messages = make(map[int64]Encoder)
  199. partitions[partition] = messages
  200. }
  201. messages[offset] = msg
  202. return mfr
  203. }
  204. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  205. partitions := mfr.highWaterMarks[topic]
  206. if partitions == nil {
  207. partitions = make(map[int32]int64)
  208. mfr.highWaterMarks[topic] = partitions
  209. }
  210. partitions[partition] = offset
  211. return mfr
  212. }
  213. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
  214. fetchRequest := reqBody.(*FetchRequest)
  215. res := &FetchResponse{
  216. Version: mfr.version,
  217. }
  218. for topic, partitions := range fetchRequest.blocks {
  219. for partition, block := range partitions {
  220. initialOffset := block.fetchOffset
  221. offset := initialOffset
  222. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  223. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  224. msg := mfr.getMessage(topic, partition, offset)
  225. if msg != nil {
  226. res.AddMessage(topic, partition, nil, msg, offset)
  227. i++
  228. }
  229. offset++
  230. }
  231. fb := res.GetBlock(topic, partition)
  232. if fb == nil {
  233. res.AddError(topic, partition, ErrNoError)
  234. fb = res.GetBlock(topic, partition)
  235. }
  236. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  237. }
  238. }
  239. return res
  240. }
  241. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  242. partitions := mfr.messages[topic]
  243. if partitions == nil {
  244. return nil
  245. }
  246. messages := partitions[partition]
  247. if messages == nil {
  248. return nil
  249. }
  250. return messages[offset]
  251. }
  252. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  253. partitions := mfr.messages[topic]
  254. if partitions == nil {
  255. return 0
  256. }
  257. messages := partitions[partition]
  258. if messages == nil {
  259. return 0
  260. }
  261. return len(messages)
  262. }
  263. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  264. partitions := mfr.highWaterMarks[topic]
  265. if partitions == nil {
  266. return 0
  267. }
  268. return partitions[partition]
  269. }
  270. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  271. type MockConsumerMetadataResponse struct {
  272. coordinators map[string]interface{}
  273. t TestReporter
  274. }
  275. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  276. return &MockConsumerMetadataResponse{
  277. coordinators: make(map[string]interface{}),
  278. t: t,
  279. }
  280. }
  281. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  282. mr.coordinators[group] = broker
  283. return mr
  284. }
  285. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  286. mr.coordinators[group] = kerror
  287. return mr
  288. }
  289. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
  290. req := reqBody.(*ConsumerMetadataRequest)
  291. group := req.ConsumerGroup
  292. res := &ConsumerMetadataResponse{}
  293. v := mr.coordinators[group]
  294. switch v := v.(type) {
  295. case *MockBroker:
  296. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  297. case KError:
  298. res.Err = v
  299. }
  300. return res
  301. }
  302. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  303. type MockFindCoordinatorResponse struct {
  304. groupCoordinators map[string]interface{}
  305. transCoordinators map[string]interface{}
  306. t TestReporter
  307. }
  308. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  309. return &MockFindCoordinatorResponse{
  310. groupCoordinators: make(map[string]interface{}),
  311. transCoordinators: make(map[string]interface{}),
  312. t: t,
  313. }
  314. }
  315. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  316. switch coordinatorType {
  317. case CoordinatorGroup:
  318. mr.groupCoordinators[group] = broker
  319. case CoordinatorTransaction:
  320. mr.transCoordinators[group] = broker
  321. }
  322. return mr
  323. }
  324. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  325. switch coordinatorType {
  326. case CoordinatorGroup:
  327. mr.groupCoordinators[group] = kerror
  328. case CoordinatorTransaction:
  329. mr.transCoordinators[group] = kerror
  330. }
  331. return mr
  332. }
  333. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
  334. req := reqBody.(*FindCoordinatorRequest)
  335. res := &FindCoordinatorResponse{}
  336. var v interface{}
  337. switch req.CoordinatorType {
  338. case CoordinatorGroup:
  339. v = mr.groupCoordinators[req.CoordinatorKey]
  340. case CoordinatorTransaction:
  341. v = mr.transCoordinators[req.CoordinatorKey]
  342. }
  343. switch v := v.(type) {
  344. case *MockBroker:
  345. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  346. case KError:
  347. res.Err = v
  348. }
  349. return res
  350. }
  351. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  352. type MockOffsetCommitResponse struct {
  353. errors map[string]map[string]map[int32]KError
  354. t TestReporter
  355. }
  356. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  357. return &MockOffsetCommitResponse{t: t}
  358. }
  359. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  360. if mr.errors == nil {
  361. mr.errors = make(map[string]map[string]map[int32]KError)
  362. }
  363. topics := mr.errors[group]
  364. if topics == nil {
  365. topics = make(map[string]map[int32]KError)
  366. mr.errors[group] = topics
  367. }
  368. partitions := topics[topic]
  369. if partitions == nil {
  370. partitions = make(map[int32]KError)
  371. topics[topic] = partitions
  372. }
  373. partitions[partition] = kerror
  374. return mr
  375. }
  376. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
  377. req := reqBody.(*OffsetCommitRequest)
  378. group := req.ConsumerGroup
  379. res := &OffsetCommitResponse{}
  380. for topic, partitions := range req.blocks {
  381. for partition := range partitions {
  382. res.AddError(topic, partition, mr.getError(group, topic, partition))
  383. }
  384. }
  385. return res
  386. }
  387. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  388. topics := mr.errors[group]
  389. if topics == nil {
  390. return ErrNoError
  391. }
  392. partitions := topics[topic]
  393. if partitions == nil {
  394. return ErrNoError
  395. }
  396. kerror, ok := partitions[partition]
  397. if !ok {
  398. return ErrNoError
  399. }
  400. return kerror
  401. }
  402. // MockProduceResponse is a `ProduceResponse` builder.
  403. type MockProduceResponse struct {
  404. version int16
  405. errors map[string]map[int32]KError
  406. t TestReporter
  407. }
  408. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  409. return &MockProduceResponse{t: t}
  410. }
  411. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  412. mr.version = version
  413. return mr
  414. }
  415. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  416. if mr.errors == nil {
  417. mr.errors = make(map[string]map[int32]KError)
  418. }
  419. partitions := mr.errors[topic]
  420. if partitions == nil {
  421. partitions = make(map[int32]KError)
  422. mr.errors[topic] = partitions
  423. }
  424. partitions[partition] = kerror
  425. return mr
  426. }
  427. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
  428. req := reqBody.(*ProduceRequest)
  429. res := &ProduceResponse{
  430. Version: mr.version,
  431. }
  432. for topic, partitions := range req.records {
  433. for partition := range partitions {
  434. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  435. }
  436. }
  437. return res
  438. }
  439. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  440. partitions := mr.errors[topic]
  441. if partitions == nil {
  442. return ErrNoError
  443. }
  444. kerror, ok := partitions[partition]
  445. if !ok {
  446. return ErrNoError
  447. }
  448. return kerror
  449. }
  450. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  451. type MockOffsetFetchResponse struct {
  452. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  453. t TestReporter
  454. }
  455. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  456. return &MockOffsetFetchResponse{t: t}
  457. }
  458. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  459. if mr.offsets == nil {
  460. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  461. }
  462. topics := mr.offsets[group]
  463. if topics == nil {
  464. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  465. mr.offsets[group] = topics
  466. }
  467. partitions := topics[topic]
  468. if partitions == nil {
  469. partitions = make(map[int32]*OffsetFetchResponseBlock)
  470. topics[topic] = partitions
  471. }
  472. partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
  473. return mr
  474. }
  475. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
  476. req := reqBody.(*OffsetFetchRequest)
  477. group := req.ConsumerGroup
  478. res := &OffsetFetchResponse{}
  479. for topic, partitions := range mr.offsets[group] {
  480. for partition, block := range partitions {
  481. res.AddBlock(topic, partition, block)
  482. }
  483. }
  484. return res
  485. }