mockresponses.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868
  1. package sarama
  2. import (
  3. "fmt"
  4. )
  5. // TestReporter has methods matching go's testing.T to avoid importing
  6. // `testing` in the main part of the library.
  7. type TestReporter interface {
  8. Error(...interface{})
  9. Errorf(string, ...interface{})
  10. Fatal(...interface{})
  11. Fatalf(string, ...interface{})
  12. }
  13. // MockResponse is a response builder interface it defines one method that
  14. // allows generating a response based on a request body. MockResponses are used
  15. // to program behavior of MockBroker in tests.
  16. type MockResponse interface {
  17. For(reqBody versionedDecoder) (res encoder)
  18. }
  19. // MockWrapper is a mock response builder that returns a particular concrete
  20. // response regardless of the actual request passed to the `For` method.
  21. type MockWrapper struct {
  22. res encoder
  23. }
  24. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
  25. return mw.res
  26. }
  27. func NewMockWrapper(res encoder) *MockWrapper {
  28. return &MockWrapper{res: res}
  29. }
  30. // MockSequence is a mock response builder that is created from a sequence of
  31. // concrete responses. Every time when a `MockBroker` calls its `For` method
  32. // the next response from the sequence is returned. When the end of the
  33. // sequence is reached the last element from the sequence is returned.
  34. type MockSequence struct {
  35. responses []MockResponse
  36. }
  37. func NewMockSequence(responses ...interface{}) *MockSequence {
  38. ms := &MockSequence{}
  39. ms.responses = make([]MockResponse, len(responses))
  40. for i, res := range responses {
  41. switch res := res.(type) {
  42. case MockResponse:
  43. ms.responses[i] = res
  44. case encoder:
  45. ms.responses[i] = NewMockWrapper(res)
  46. default:
  47. panic(fmt.Sprintf("Unexpected response type: %T", res))
  48. }
  49. }
  50. return ms
  51. }
  52. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
  53. res = mc.responses[0].For(reqBody)
  54. if len(mc.responses) > 1 {
  55. mc.responses = mc.responses[1:]
  56. }
  57. return res
  58. }
  59. type MockListGroupsResponse struct {
  60. groups map[string]string
  61. t TestReporter
  62. }
  63. func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
  64. return &MockListGroupsResponse{
  65. groups: make(map[string]string),
  66. t: t,
  67. }
  68. }
  69. func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
  70. request := reqBody.(*ListGroupsRequest)
  71. _ = request
  72. response := &ListGroupsResponse{
  73. Groups: m.groups,
  74. }
  75. return response
  76. }
  77. func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
  78. m.groups[groupID] = protocolType
  79. return m
  80. }
  81. type MockDescribeGroupsResponse struct {
  82. groups map[string]*GroupDescription
  83. t TestReporter
  84. }
  85. func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
  86. return &MockDescribeGroupsResponse{
  87. t: t,
  88. groups: make(map[string]*GroupDescription),
  89. }
  90. }
  91. func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
  92. m.groups[groupID] = description
  93. return m
  94. }
  95. func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
  96. request := reqBody.(*DescribeGroupsRequest)
  97. response := &DescribeGroupsResponse{}
  98. for _, requestedGroup := range request.Groups {
  99. if group, ok := m.groups[requestedGroup]; ok {
  100. response.Groups = append(response.Groups, group)
  101. } else {
  102. // Mimic real kafka - if a group doesn't exist, return
  103. // an entry with state "Dead"
  104. response.Groups = append(response.Groups, &GroupDescription{
  105. GroupId: requestedGroup,
  106. State: "Dead",
  107. })
  108. }
  109. }
  110. return response
  111. }
  112. // MockMetadataResponse is a `MetadataResponse` builder.
  113. type MockMetadataResponse struct {
  114. controllerID int32
  115. leaders map[string]map[int32]int32
  116. brokers map[string]int32
  117. t TestReporter
  118. }
  119. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  120. return &MockMetadataResponse{
  121. leaders: make(map[string]map[int32]int32),
  122. brokers: make(map[string]int32),
  123. t: t,
  124. }
  125. }
  126. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  127. partitions := mmr.leaders[topic]
  128. if partitions == nil {
  129. partitions = make(map[int32]int32)
  130. mmr.leaders[topic] = partitions
  131. }
  132. partitions[partition] = brokerID
  133. return mmr
  134. }
  135. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  136. mmr.brokers[addr] = brokerID
  137. return mmr
  138. }
  139. func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
  140. mmr.controllerID = brokerID
  141. return mmr
  142. }
  143. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
  144. metadataRequest := reqBody.(*MetadataRequest)
  145. metadataResponse := &MetadataResponse{
  146. Version: metadataRequest.version(),
  147. ControllerID: mmr.controllerID,
  148. }
  149. for addr, brokerID := range mmr.brokers {
  150. metadataResponse.AddBroker(addr, brokerID)
  151. }
  152. // Generate set of replicas
  153. replicas := []int32{}
  154. for _, brokerID := range mmr.brokers {
  155. replicas = append(replicas, brokerID)
  156. }
  157. if len(metadataRequest.Topics) == 0 {
  158. for topic, partitions := range mmr.leaders {
  159. for partition, brokerID := range partitions {
  160. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
  161. }
  162. }
  163. return metadataResponse
  164. }
  165. for _, topic := range metadataRequest.Topics {
  166. for partition, brokerID := range mmr.leaders[topic] {
  167. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
  168. }
  169. }
  170. return metadataResponse
  171. }
  172. // MockOffsetResponse is an `OffsetResponse` builder.
  173. type MockOffsetResponse struct {
  174. offsets map[string]map[int32]map[int64]int64
  175. t TestReporter
  176. version int16
  177. }
  178. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  179. return &MockOffsetResponse{
  180. offsets: make(map[string]map[int32]map[int64]int64),
  181. t: t,
  182. }
  183. }
  184. func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
  185. mor.version = version
  186. return mor
  187. }
  188. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  189. partitions := mor.offsets[topic]
  190. if partitions == nil {
  191. partitions = make(map[int32]map[int64]int64)
  192. mor.offsets[topic] = partitions
  193. }
  194. times := partitions[partition]
  195. if times == nil {
  196. times = make(map[int64]int64)
  197. partitions[partition] = times
  198. }
  199. times[time] = offset
  200. return mor
  201. }
  202. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
  203. offsetRequest := reqBody.(*OffsetRequest)
  204. offsetResponse := &OffsetResponse{Version: mor.version}
  205. for topic, partitions := range offsetRequest.blocks {
  206. for partition, block := range partitions {
  207. offset := mor.getOffset(topic, partition, block.time)
  208. offsetResponse.AddTopicPartition(topic, partition, offset)
  209. }
  210. }
  211. return offsetResponse
  212. }
  213. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  214. partitions := mor.offsets[topic]
  215. if partitions == nil {
  216. mor.t.Errorf("missing topic: %s", topic)
  217. }
  218. times := partitions[partition]
  219. if times == nil {
  220. mor.t.Errorf("missing partition: %d", partition)
  221. }
  222. offset, ok := times[time]
  223. if !ok {
  224. mor.t.Errorf("missing time: %d", time)
  225. }
  226. return offset
  227. }
  228. // MockFetchResponse is a `FetchResponse` builder.
  229. type MockFetchResponse struct {
  230. messages map[string]map[int32]map[int64]Encoder
  231. highWaterMarks map[string]map[int32]int64
  232. t TestReporter
  233. batchSize int
  234. version int16
  235. }
  236. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  237. return &MockFetchResponse{
  238. messages: make(map[string]map[int32]map[int64]Encoder),
  239. highWaterMarks: make(map[string]map[int32]int64),
  240. t: t,
  241. batchSize: batchSize,
  242. }
  243. }
  244. func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
  245. mfr.version = version
  246. return mfr
  247. }
  248. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  249. partitions := mfr.messages[topic]
  250. if partitions == nil {
  251. partitions = make(map[int32]map[int64]Encoder)
  252. mfr.messages[topic] = partitions
  253. }
  254. messages := partitions[partition]
  255. if messages == nil {
  256. messages = make(map[int64]Encoder)
  257. partitions[partition] = messages
  258. }
  259. messages[offset] = msg
  260. return mfr
  261. }
  262. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  263. partitions := mfr.highWaterMarks[topic]
  264. if partitions == nil {
  265. partitions = make(map[int32]int64)
  266. mfr.highWaterMarks[topic] = partitions
  267. }
  268. partitions[partition] = offset
  269. return mfr
  270. }
  271. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
  272. fetchRequest := reqBody.(*FetchRequest)
  273. res := &FetchResponse{
  274. Version: mfr.version,
  275. }
  276. for topic, partitions := range fetchRequest.blocks {
  277. for partition, block := range partitions {
  278. initialOffset := block.fetchOffset
  279. offset := initialOffset
  280. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  281. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  282. msg := mfr.getMessage(topic, partition, offset)
  283. if msg != nil {
  284. res.AddMessage(topic, partition, nil, msg, offset)
  285. i++
  286. }
  287. offset++
  288. }
  289. fb := res.GetBlock(topic, partition)
  290. if fb == nil {
  291. res.AddError(topic, partition, ErrNoError)
  292. fb = res.GetBlock(topic, partition)
  293. }
  294. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  295. }
  296. }
  297. return res
  298. }
  299. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  300. partitions := mfr.messages[topic]
  301. if partitions == nil {
  302. return nil
  303. }
  304. messages := partitions[partition]
  305. if messages == nil {
  306. return nil
  307. }
  308. return messages[offset]
  309. }
  310. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  311. partitions := mfr.messages[topic]
  312. if partitions == nil {
  313. return 0
  314. }
  315. messages := partitions[partition]
  316. if messages == nil {
  317. return 0
  318. }
  319. return len(messages)
  320. }
  321. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  322. partitions := mfr.highWaterMarks[topic]
  323. if partitions == nil {
  324. return 0
  325. }
  326. return partitions[partition]
  327. }
  328. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  329. type MockConsumerMetadataResponse struct {
  330. coordinators map[string]interface{}
  331. t TestReporter
  332. }
  333. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  334. return &MockConsumerMetadataResponse{
  335. coordinators: make(map[string]interface{}),
  336. t: t,
  337. }
  338. }
  339. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  340. mr.coordinators[group] = broker
  341. return mr
  342. }
  343. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  344. mr.coordinators[group] = kerror
  345. return mr
  346. }
  347. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
  348. req := reqBody.(*ConsumerMetadataRequest)
  349. group := req.ConsumerGroup
  350. res := &ConsumerMetadataResponse{}
  351. v := mr.coordinators[group]
  352. switch v := v.(type) {
  353. case *MockBroker:
  354. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  355. case KError:
  356. res.Err = v
  357. }
  358. return res
  359. }
  360. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  361. type MockFindCoordinatorResponse struct {
  362. groupCoordinators map[string]interface{}
  363. transCoordinators map[string]interface{}
  364. t TestReporter
  365. }
  366. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  367. return &MockFindCoordinatorResponse{
  368. groupCoordinators: make(map[string]interface{}),
  369. transCoordinators: make(map[string]interface{}),
  370. t: t,
  371. }
  372. }
  373. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  374. switch coordinatorType {
  375. case CoordinatorGroup:
  376. mr.groupCoordinators[group] = broker
  377. case CoordinatorTransaction:
  378. mr.transCoordinators[group] = broker
  379. }
  380. return mr
  381. }
  382. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  383. switch coordinatorType {
  384. case CoordinatorGroup:
  385. mr.groupCoordinators[group] = kerror
  386. case CoordinatorTransaction:
  387. mr.transCoordinators[group] = kerror
  388. }
  389. return mr
  390. }
  391. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
  392. req := reqBody.(*FindCoordinatorRequest)
  393. res := &FindCoordinatorResponse{}
  394. var v interface{}
  395. switch req.CoordinatorType {
  396. case CoordinatorGroup:
  397. v = mr.groupCoordinators[req.CoordinatorKey]
  398. case CoordinatorTransaction:
  399. v = mr.transCoordinators[req.CoordinatorKey]
  400. }
  401. switch v := v.(type) {
  402. case *MockBroker:
  403. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  404. case KError:
  405. res.Err = v
  406. }
  407. return res
  408. }
  409. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  410. type MockOffsetCommitResponse struct {
  411. errors map[string]map[string]map[int32]KError
  412. t TestReporter
  413. }
  414. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  415. return &MockOffsetCommitResponse{t: t}
  416. }
  417. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  418. if mr.errors == nil {
  419. mr.errors = make(map[string]map[string]map[int32]KError)
  420. }
  421. topics := mr.errors[group]
  422. if topics == nil {
  423. topics = make(map[string]map[int32]KError)
  424. mr.errors[group] = topics
  425. }
  426. partitions := topics[topic]
  427. if partitions == nil {
  428. partitions = make(map[int32]KError)
  429. topics[topic] = partitions
  430. }
  431. partitions[partition] = kerror
  432. return mr
  433. }
  434. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
  435. req := reqBody.(*OffsetCommitRequest)
  436. group := req.ConsumerGroup
  437. res := &OffsetCommitResponse{}
  438. for topic, partitions := range req.blocks {
  439. for partition := range partitions {
  440. res.AddError(topic, partition, mr.getError(group, topic, partition))
  441. }
  442. }
  443. return res
  444. }
  445. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  446. topics := mr.errors[group]
  447. if topics == nil {
  448. return ErrNoError
  449. }
  450. partitions := topics[topic]
  451. if partitions == nil {
  452. return ErrNoError
  453. }
  454. kerror, ok := partitions[partition]
  455. if !ok {
  456. return ErrNoError
  457. }
  458. return kerror
  459. }
  460. // MockProduceResponse is a `ProduceResponse` builder.
  461. type MockProduceResponse struct {
  462. version int16
  463. errors map[string]map[int32]KError
  464. t TestReporter
  465. }
  466. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  467. return &MockProduceResponse{t: t}
  468. }
  469. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  470. mr.version = version
  471. return mr
  472. }
  473. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  474. if mr.errors == nil {
  475. mr.errors = make(map[string]map[int32]KError)
  476. }
  477. partitions := mr.errors[topic]
  478. if partitions == nil {
  479. partitions = make(map[int32]KError)
  480. mr.errors[topic] = partitions
  481. }
  482. partitions[partition] = kerror
  483. return mr
  484. }
  485. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
  486. req := reqBody.(*ProduceRequest)
  487. res := &ProduceResponse{
  488. Version: mr.version,
  489. }
  490. for topic, partitions := range req.records {
  491. for partition := range partitions {
  492. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  493. }
  494. }
  495. return res
  496. }
  497. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  498. partitions := mr.errors[topic]
  499. if partitions == nil {
  500. return ErrNoError
  501. }
  502. kerror, ok := partitions[partition]
  503. if !ok {
  504. return ErrNoError
  505. }
  506. return kerror
  507. }
  508. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  509. type MockOffsetFetchResponse struct {
  510. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  511. t TestReporter
  512. }
  513. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  514. return &MockOffsetFetchResponse{t: t}
  515. }
  516. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  517. if mr.offsets == nil {
  518. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  519. }
  520. topics := mr.offsets[group]
  521. if topics == nil {
  522. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  523. mr.offsets[group] = topics
  524. }
  525. partitions := topics[topic]
  526. if partitions == nil {
  527. partitions = make(map[int32]*OffsetFetchResponseBlock)
  528. topics[topic] = partitions
  529. }
  530. partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
  531. return mr
  532. }
  533. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
  534. req := reqBody.(*OffsetFetchRequest)
  535. group := req.ConsumerGroup
  536. res := &OffsetFetchResponse{}
  537. for topic, partitions := range mr.offsets[group] {
  538. for partition, block := range partitions {
  539. res.AddBlock(topic, partition, block)
  540. }
  541. }
  542. return res
  543. }
  544. type MockCreateTopicsResponse struct {
  545. t TestReporter
  546. }
  547. func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
  548. return &MockCreateTopicsResponse{t: t}
  549. }
  550. func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
  551. req := reqBody.(*CreateTopicsRequest)
  552. res := &CreateTopicsResponse{}
  553. res.TopicErrors = make(map[string]*TopicError)
  554. for topic, _ := range req.TopicDetails {
  555. res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
  556. }
  557. return res
  558. }
  559. type MockDeleteTopicsResponse struct {
  560. t TestReporter
  561. }
  562. func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
  563. return &MockDeleteTopicsResponse{t: t}
  564. }
  565. func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
  566. req := reqBody.(*DeleteTopicsRequest)
  567. res := &DeleteTopicsResponse{}
  568. res.TopicErrorCodes = make(map[string]KError)
  569. for _, topic := range req.Topics {
  570. res.TopicErrorCodes[topic] = ErrNoError
  571. }
  572. return res
  573. }
  574. type MockCreatePartitionsResponse struct {
  575. t TestReporter
  576. }
  577. func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
  578. return &MockCreatePartitionsResponse{t: t}
  579. }
  580. func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
  581. req := reqBody.(*CreatePartitionsRequest)
  582. res := &CreatePartitionsResponse{}
  583. res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
  584. for topic, _ := range req.TopicPartitions {
  585. res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
  586. }
  587. return res
  588. }
  589. type MockDeleteRecordsResponse struct {
  590. t TestReporter
  591. }
  592. func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
  593. return &MockDeleteRecordsResponse{t: t}
  594. }
  595. func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
  596. req := reqBody.(*DeleteRecordsRequest)
  597. res := &DeleteRecordsResponse{}
  598. res.Topics = make(map[string]*DeleteRecordsResponseTopic)
  599. for topic, deleteRecordRequestTopic := range req.Topics {
  600. partitions := make(map[int32]*DeleteRecordsResponsePartition)
  601. for partition, _ := range deleteRecordRequestTopic.PartitionOffsets {
  602. partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
  603. }
  604. res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
  605. }
  606. return res
  607. }
  608. type MockDescribeConfigsResponse struct {
  609. t TestReporter
  610. }
  611. func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
  612. return &MockDescribeConfigsResponse{t: t}
  613. }
  614. func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
  615. req := reqBody.(*DescribeConfigsRequest)
  616. res := &DescribeConfigsResponse{}
  617. for _, r := range req.Resources {
  618. var configEntries []*ConfigEntry
  619. switch r.Type {
  620. case TopicResource:
  621. configEntries = append(configEntries,
  622. &ConfigEntry{Name: "max.message.bytes",
  623. Value: "1000000",
  624. ReadOnly: false,
  625. Default: true,
  626. Sensitive: false,
  627. }, &ConfigEntry{Name: "retention.ms",
  628. Value: "5000",
  629. ReadOnly: false,
  630. Default: false,
  631. Sensitive: false,
  632. }, &ConfigEntry{Name: "password",
  633. Value: "12345",
  634. ReadOnly: false,
  635. Default: false,
  636. Sensitive: true,
  637. })
  638. res.Resources = append(res.Resources, &ResourceResponse{
  639. Name: r.Name,
  640. Configs: configEntries,
  641. })
  642. }
  643. }
  644. return res
  645. }
  646. type MockAlterConfigsResponse struct {
  647. t TestReporter
  648. }
  649. func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
  650. return &MockAlterConfigsResponse{t: t}
  651. }
  652. func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
  653. req := reqBody.(*AlterConfigsRequest)
  654. res := &AlterConfigsResponse{}
  655. for _, r := range req.Resources {
  656. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
  657. Type: TopicResource,
  658. ErrorMsg: "",
  659. })
  660. }
  661. return res
  662. }
  663. type MockCreateAclsResponse struct {
  664. t TestReporter
  665. }
  666. func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
  667. return &MockCreateAclsResponse{t: t}
  668. }
  669. func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
  670. req := reqBody.(*CreateAclsRequest)
  671. res := &CreateAclsResponse{}
  672. for range req.AclCreations {
  673. res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
  674. }
  675. return res
  676. }
  677. type MockListAclsResponse struct {
  678. t TestReporter
  679. }
  680. func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
  681. return &MockListAclsResponse{t: t}
  682. }
  683. func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
  684. req := reqBody.(*DescribeAclsRequest)
  685. res := &DescribeAclsResponse{}
  686. res.Err = ErrNoError
  687. acl := &ResourceAcls{}
  688. acl.Resource.ResourceName = *req.ResourceName
  689. acl.Resource.ResourceType = req.ResourceType
  690. acl.Acls = append(acl.Acls, &Acl{})
  691. res.ResourceAcls = append(res.ResourceAcls, acl)
  692. return res
  693. }
  694. type MockSaslAuthenticateResponse struct {
  695. t TestReporter
  696. kerror KError
  697. saslAuthBytes []byte
  698. }
  699. func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
  700. return &MockSaslAuthenticateResponse{t: t}
  701. }
  702. func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
  703. res := &SaslAuthenticateResponse{}
  704. res.Err = msar.kerror
  705. res.SaslAuthBytes = msar.saslAuthBytes
  706. return res
  707. }
  708. func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
  709. msar.kerror = kerror
  710. return msar
  711. }
  712. func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
  713. msar.saslAuthBytes = saslAuthBytes
  714. return msar
  715. }
  716. type MockDeleteAclsResponse struct {
  717. t TestReporter
  718. }
  719. type MockSaslHandshakeResponse struct {
  720. enabledMechanisms []string
  721. kerror KError
  722. t TestReporter
  723. }
  724. func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
  725. return &MockSaslHandshakeResponse{t: t}
  726. }
  727. func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
  728. res := &SaslHandshakeResponse{}
  729. res.Err = mshr.kerror
  730. res.EnabledMechanisms = mshr.enabledMechanisms
  731. return res
  732. }
  733. func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
  734. mshr.kerror = kerror
  735. return mshr
  736. }
  737. func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
  738. mshr.enabledMechanisms = enabledMechanisms
  739. return mshr
  740. }
  741. func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
  742. return &MockDeleteAclsResponse{t: t}
  743. }
  744. func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
  745. req := reqBody.(*DeleteAclsRequest)
  746. res := &DeleteAclsResponse{}
  747. for range req.Filters {
  748. response := &FilterResponse{Err: ErrNoError}
  749. response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
  750. res.FilterResponses = append(res.FilterResponses, response)
  751. }
  752. return res
  753. }