mockresponses.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
  1. package sarama
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. // TestReporter has methods matching go's testing.T to avoid importing
  7. // `testing` in the main part of the library.
  8. type TestReporter interface {
  9. Error(...interface{})
  10. Errorf(string, ...interface{})
  11. Fatal(...interface{})
  12. Fatalf(string, ...interface{})
  13. }
  14. // MockResponse is a response builder interface it defines one method that
  15. // allows generating a response based on a request body. MockResponses are used
  16. // to program behavior of MockBroker in tests.
  17. type MockResponse interface {
  18. For(reqBody versionedDecoder) (res encoder)
  19. }
  20. // MockWrapper is a mock response builder that returns a particular concrete
  21. // response regardless of the actual request passed to the `For` method.
  22. type MockWrapper struct {
  23. res encoder
  24. }
  25. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
  26. return mw.res
  27. }
  28. func NewMockWrapper(res encoder) *MockWrapper {
  29. return &MockWrapper{res: res}
  30. }
  31. // MockSequence is a mock response builder that is created from a sequence of
  32. // concrete responses. Every time when a `MockBroker` calls its `For` method
  33. // the next response from the sequence is returned. When the end of the
  34. // sequence is reached the last element from the sequence is returned.
  35. type MockSequence struct {
  36. responses []MockResponse
  37. }
  38. func NewMockSequence(responses ...interface{}) *MockSequence {
  39. ms := &MockSequence{}
  40. ms.responses = make([]MockResponse, len(responses))
  41. for i, res := range responses {
  42. switch res := res.(type) {
  43. case MockResponse:
  44. ms.responses[i] = res
  45. case encoder:
  46. ms.responses[i] = NewMockWrapper(res)
  47. default:
  48. panic(fmt.Sprintf("Unexpected response type: %T", res))
  49. }
  50. }
  51. return ms
  52. }
  53. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
  54. res = mc.responses[0].For(reqBody)
  55. if len(mc.responses) > 1 {
  56. mc.responses = mc.responses[1:]
  57. }
  58. return res
  59. }
  60. type MockListGroupsResponse struct {
  61. groups map[string]string
  62. t TestReporter
  63. }
  64. func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
  65. return &MockListGroupsResponse{
  66. groups: make(map[string]string),
  67. t: t,
  68. }
  69. }
  70. func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
  71. request := reqBody.(*ListGroupsRequest)
  72. _ = request
  73. response := &ListGroupsResponse{
  74. Groups: m.groups,
  75. }
  76. return response
  77. }
  78. func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
  79. m.groups[groupID] = protocolType
  80. return m
  81. }
  82. type MockDescribeGroupsResponse struct {
  83. groups map[string]*GroupDescription
  84. t TestReporter
  85. }
  86. func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
  87. return &MockDescribeGroupsResponse{
  88. t: t,
  89. groups: make(map[string]*GroupDescription),
  90. }
  91. }
  92. func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
  93. m.groups[groupID] = description
  94. return m
  95. }
  96. func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
  97. request := reqBody.(*DescribeGroupsRequest)
  98. response := &DescribeGroupsResponse{}
  99. for _, requestedGroup := range request.Groups {
  100. if group, ok := m.groups[requestedGroup]; ok {
  101. response.Groups = append(response.Groups, group)
  102. } else {
  103. // Mimic real kafka - if a group doesn't exist, return
  104. // an entry with state "Dead"
  105. response.Groups = append(response.Groups, &GroupDescription{
  106. GroupId: requestedGroup,
  107. State: "Dead",
  108. })
  109. }
  110. }
  111. return response
  112. }
  113. // MockMetadataResponse is a `MetadataResponse` builder.
  114. type MockMetadataResponse struct {
  115. controllerID int32
  116. leaders map[string]map[int32]int32
  117. brokers map[string]int32
  118. t TestReporter
  119. }
  120. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  121. return &MockMetadataResponse{
  122. leaders: make(map[string]map[int32]int32),
  123. brokers: make(map[string]int32),
  124. t: t,
  125. }
  126. }
  127. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  128. partitions := mmr.leaders[topic]
  129. if partitions == nil {
  130. partitions = make(map[int32]int32)
  131. mmr.leaders[topic] = partitions
  132. }
  133. partitions[partition] = brokerID
  134. return mmr
  135. }
  136. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  137. mmr.brokers[addr] = brokerID
  138. return mmr
  139. }
  140. func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
  141. mmr.controllerID = brokerID
  142. return mmr
  143. }
  144. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
  145. metadataRequest := reqBody.(*MetadataRequest)
  146. metadataResponse := &MetadataResponse{
  147. Version: metadataRequest.version(),
  148. ControllerID: mmr.controllerID,
  149. }
  150. for addr, brokerID := range mmr.brokers {
  151. metadataResponse.AddBroker(addr, brokerID)
  152. }
  153. // Generate set of replicas
  154. replicas := []int32{}
  155. for _, brokerID := range mmr.brokers {
  156. replicas = append(replicas, brokerID)
  157. }
  158. if len(metadataRequest.Topics) == 0 {
  159. for topic, partitions := range mmr.leaders {
  160. for partition, brokerID := range partitions {
  161. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
  162. }
  163. }
  164. return metadataResponse
  165. }
  166. for _, topic := range metadataRequest.Topics {
  167. for partition, brokerID := range mmr.leaders[topic] {
  168. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
  169. }
  170. }
  171. return metadataResponse
  172. }
  173. // MockOffsetResponse is an `OffsetResponse` builder.
  174. type MockOffsetResponse struct {
  175. offsets map[string]map[int32]map[int64]int64
  176. t TestReporter
  177. version int16
  178. }
  179. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  180. return &MockOffsetResponse{
  181. offsets: make(map[string]map[int32]map[int64]int64),
  182. t: t,
  183. }
  184. }
  185. func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
  186. mor.version = version
  187. return mor
  188. }
  189. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  190. partitions := mor.offsets[topic]
  191. if partitions == nil {
  192. partitions = make(map[int32]map[int64]int64)
  193. mor.offsets[topic] = partitions
  194. }
  195. times := partitions[partition]
  196. if times == nil {
  197. times = make(map[int64]int64)
  198. partitions[partition] = times
  199. }
  200. times[time] = offset
  201. return mor
  202. }
  203. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
  204. offsetRequest := reqBody.(*OffsetRequest)
  205. offsetResponse := &OffsetResponse{Version: mor.version}
  206. for topic, partitions := range offsetRequest.blocks {
  207. for partition, block := range partitions {
  208. offset := mor.getOffset(topic, partition, block.time)
  209. offsetResponse.AddTopicPartition(topic, partition, offset)
  210. }
  211. }
  212. return offsetResponse
  213. }
  214. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  215. partitions := mor.offsets[topic]
  216. if partitions == nil {
  217. mor.t.Errorf("missing topic: %s", topic)
  218. }
  219. times := partitions[partition]
  220. if times == nil {
  221. mor.t.Errorf("missing partition: %d", partition)
  222. }
  223. offset, ok := times[time]
  224. if !ok {
  225. mor.t.Errorf("missing time: %d", time)
  226. }
  227. return offset
  228. }
  229. // MockFetchResponse is a `FetchResponse` builder.
  230. type MockFetchResponse struct {
  231. messages map[string]map[int32]map[int64]Encoder
  232. highWaterMarks map[string]map[int32]int64
  233. t TestReporter
  234. batchSize int
  235. version int16
  236. }
  237. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  238. return &MockFetchResponse{
  239. messages: make(map[string]map[int32]map[int64]Encoder),
  240. highWaterMarks: make(map[string]map[int32]int64),
  241. t: t,
  242. batchSize: batchSize,
  243. }
  244. }
  245. func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
  246. mfr.version = version
  247. return mfr
  248. }
  249. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  250. partitions := mfr.messages[topic]
  251. if partitions == nil {
  252. partitions = make(map[int32]map[int64]Encoder)
  253. mfr.messages[topic] = partitions
  254. }
  255. messages := partitions[partition]
  256. if messages == nil {
  257. messages = make(map[int64]Encoder)
  258. partitions[partition] = messages
  259. }
  260. messages[offset] = msg
  261. return mfr
  262. }
  263. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  264. partitions := mfr.highWaterMarks[topic]
  265. if partitions == nil {
  266. partitions = make(map[int32]int64)
  267. mfr.highWaterMarks[topic] = partitions
  268. }
  269. partitions[partition] = offset
  270. return mfr
  271. }
  272. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
  273. fetchRequest := reqBody.(*FetchRequest)
  274. res := &FetchResponse{
  275. Version: mfr.version,
  276. }
  277. for topic, partitions := range fetchRequest.blocks {
  278. for partition, block := range partitions {
  279. initialOffset := block.fetchOffset
  280. offset := initialOffset
  281. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  282. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  283. msg := mfr.getMessage(topic, partition, offset)
  284. if msg != nil {
  285. res.AddMessage(topic, partition, nil, msg, offset)
  286. i++
  287. }
  288. offset++
  289. }
  290. fb := res.GetBlock(topic, partition)
  291. if fb == nil {
  292. res.AddError(topic, partition, ErrNoError)
  293. fb = res.GetBlock(topic, partition)
  294. }
  295. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  296. }
  297. }
  298. return res
  299. }
  300. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  301. partitions := mfr.messages[topic]
  302. if partitions == nil {
  303. return nil
  304. }
  305. messages := partitions[partition]
  306. if messages == nil {
  307. return nil
  308. }
  309. return messages[offset]
  310. }
  311. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  312. partitions := mfr.messages[topic]
  313. if partitions == nil {
  314. return 0
  315. }
  316. messages := partitions[partition]
  317. if messages == nil {
  318. return 0
  319. }
  320. return len(messages)
  321. }
  322. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  323. partitions := mfr.highWaterMarks[topic]
  324. if partitions == nil {
  325. return 0
  326. }
  327. return partitions[partition]
  328. }
  329. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  330. type MockConsumerMetadataResponse struct {
  331. coordinators map[string]interface{}
  332. t TestReporter
  333. }
  334. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  335. return &MockConsumerMetadataResponse{
  336. coordinators: make(map[string]interface{}),
  337. t: t,
  338. }
  339. }
  340. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  341. mr.coordinators[group] = broker
  342. return mr
  343. }
  344. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  345. mr.coordinators[group] = kerror
  346. return mr
  347. }
  348. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
  349. req := reqBody.(*ConsumerMetadataRequest)
  350. group := req.ConsumerGroup
  351. res := &ConsumerMetadataResponse{}
  352. v := mr.coordinators[group]
  353. switch v := v.(type) {
  354. case *MockBroker:
  355. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  356. case KError:
  357. res.Err = v
  358. }
  359. return res
  360. }
  361. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  362. type MockFindCoordinatorResponse struct {
  363. groupCoordinators map[string]interface{}
  364. transCoordinators map[string]interface{}
  365. t TestReporter
  366. }
  367. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  368. return &MockFindCoordinatorResponse{
  369. groupCoordinators: make(map[string]interface{}),
  370. transCoordinators: make(map[string]interface{}),
  371. t: t,
  372. }
  373. }
  374. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  375. switch coordinatorType {
  376. case CoordinatorGroup:
  377. mr.groupCoordinators[group] = broker
  378. case CoordinatorTransaction:
  379. mr.transCoordinators[group] = broker
  380. }
  381. return mr
  382. }
  383. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  384. switch coordinatorType {
  385. case CoordinatorGroup:
  386. mr.groupCoordinators[group] = kerror
  387. case CoordinatorTransaction:
  388. mr.transCoordinators[group] = kerror
  389. }
  390. return mr
  391. }
  392. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
  393. req := reqBody.(*FindCoordinatorRequest)
  394. res := &FindCoordinatorResponse{}
  395. var v interface{}
  396. switch req.CoordinatorType {
  397. case CoordinatorGroup:
  398. v = mr.groupCoordinators[req.CoordinatorKey]
  399. case CoordinatorTransaction:
  400. v = mr.transCoordinators[req.CoordinatorKey]
  401. }
  402. switch v := v.(type) {
  403. case *MockBroker:
  404. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  405. case KError:
  406. res.Err = v
  407. }
  408. return res
  409. }
  410. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  411. type MockOffsetCommitResponse struct {
  412. errors map[string]map[string]map[int32]KError
  413. t TestReporter
  414. }
  415. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  416. return &MockOffsetCommitResponse{t: t}
  417. }
  418. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  419. if mr.errors == nil {
  420. mr.errors = make(map[string]map[string]map[int32]KError)
  421. }
  422. topics := mr.errors[group]
  423. if topics == nil {
  424. topics = make(map[string]map[int32]KError)
  425. mr.errors[group] = topics
  426. }
  427. partitions := topics[topic]
  428. if partitions == nil {
  429. partitions = make(map[int32]KError)
  430. topics[topic] = partitions
  431. }
  432. partitions[partition] = kerror
  433. return mr
  434. }
  435. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
  436. req := reqBody.(*OffsetCommitRequest)
  437. group := req.ConsumerGroup
  438. res := &OffsetCommitResponse{}
  439. for topic, partitions := range req.blocks {
  440. for partition := range partitions {
  441. res.AddError(topic, partition, mr.getError(group, topic, partition))
  442. }
  443. }
  444. return res
  445. }
  446. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  447. topics := mr.errors[group]
  448. if topics == nil {
  449. return ErrNoError
  450. }
  451. partitions := topics[topic]
  452. if partitions == nil {
  453. return ErrNoError
  454. }
  455. kerror, ok := partitions[partition]
  456. if !ok {
  457. return ErrNoError
  458. }
  459. return kerror
  460. }
  461. // MockProduceResponse is a `ProduceResponse` builder.
  462. type MockProduceResponse struct {
  463. version int16
  464. errors map[string]map[int32]KError
  465. t TestReporter
  466. }
  467. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  468. return &MockProduceResponse{t: t}
  469. }
  470. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  471. mr.version = version
  472. return mr
  473. }
  474. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  475. if mr.errors == nil {
  476. mr.errors = make(map[string]map[int32]KError)
  477. }
  478. partitions := mr.errors[topic]
  479. if partitions == nil {
  480. partitions = make(map[int32]KError)
  481. mr.errors[topic] = partitions
  482. }
  483. partitions[partition] = kerror
  484. return mr
  485. }
  486. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
  487. req := reqBody.(*ProduceRequest)
  488. res := &ProduceResponse{
  489. Version: mr.version,
  490. }
  491. for topic, partitions := range req.records {
  492. for partition := range partitions {
  493. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  494. }
  495. }
  496. return res
  497. }
  498. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  499. partitions := mr.errors[topic]
  500. if partitions == nil {
  501. return ErrNoError
  502. }
  503. kerror, ok := partitions[partition]
  504. if !ok {
  505. return ErrNoError
  506. }
  507. return kerror
  508. }
  509. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  510. type MockOffsetFetchResponse struct {
  511. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  512. t TestReporter
  513. }
  514. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  515. return &MockOffsetFetchResponse{t: t}
  516. }
  517. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  518. if mr.offsets == nil {
  519. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  520. }
  521. topics := mr.offsets[group]
  522. if topics == nil {
  523. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  524. mr.offsets[group] = topics
  525. }
  526. partitions := topics[topic]
  527. if partitions == nil {
  528. partitions = make(map[int32]*OffsetFetchResponseBlock)
  529. topics[topic] = partitions
  530. }
  531. partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
  532. return mr
  533. }
  534. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
  535. req := reqBody.(*OffsetFetchRequest)
  536. group := req.ConsumerGroup
  537. res := &OffsetFetchResponse{}
  538. for topic, partitions := range mr.offsets[group] {
  539. for partition, block := range partitions {
  540. res.AddBlock(topic, partition, block)
  541. }
  542. }
  543. return res
  544. }
  545. type MockCreateTopicsResponse struct {
  546. t TestReporter
  547. }
  548. func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
  549. return &MockCreateTopicsResponse{t: t}
  550. }
  551. func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
  552. req := reqBody.(*CreateTopicsRequest)
  553. res := &CreateTopicsResponse{
  554. Version: req.Version,
  555. }
  556. res.TopicErrors = make(map[string]*TopicError)
  557. for topic, _ := range req.TopicDetails {
  558. if res.Version >= 1 && strings.HasPrefix(topic, "_") {
  559. msg := "insufficient permissions to create topic with reserved prefix"
  560. res.TopicErrors[topic] = &TopicError{
  561. Err: ErrTopicAuthorizationFailed,
  562. ErrMsg: &msg,
  563. }
  564. continue
  565. }
  566. res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
  567. }
  568. return res
  569. }
  570. type MockDeleteTopicsResponse struct {
  571. t TestReporter
  572. }
  573. func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
  574. return &MockDeleteTopicsResponse{t: t}
  575. }
  576. func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
  577. req := reqBody.(*DeleteTopicsRequest)
  578. res := &DeleteTopicsResponse{}
  579. res.TopicErrorCodes = make(map[string]KError)
  580. for _, topic := range req.Topics {
  581. res.TopicErrorCodes[topic] = ErrNoError
  582. }
  583. return res
  584. }
  585. type MockCreatePartitionsResponse struct {
  586. t TestReporter
  587. }
  588. func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
  589. return &MockCreatePartitionsResponse{t: t}
  590. }
  591. func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
  592. req := reqBody.(*CreatePartitionsRequest)
  593. res := &CreatePartitionsResponse{}
  594. res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
  595. for topic, _ := range req.TopicPartitions {
  596. if strings.HasPrefix(topic, "_") {
  597. msg := "insufficient permissions to create partition on topic with reserved prefix"
  598. res.TopicPartitionErrors[topic] = &TopicPartitionError{
  599. Err: ErrTopicAuthorizationFailed,
  600. ErrMsg: &msg,
  601. }
  602. continue
  603. }
  604. res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
  605. }
  606. return res
  607. }
  608. type MockDeleteRecordsResponse struct {
  609. t TestReporter
  610. }
  611. func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
  612. return &MockDeleteRecordsResponse{t: t}
  613. }
  614. func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
  615. req := reqBody.(*DeleteRecordsRequest)
  616. res := &DeleteRecordsResponse{}
  617. res.Topics = make(map[string]*DeleteRecordsResponseTopic)
  618. for topic, deleteRecordRequestTopic := range req.Topics {
  619. partitions := make(map[int32]*DeleteRecordsResponsePartition)
  620. for partition, _ := range deleteRecordRequestTopic.PartitionOffsets {
  621. partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
  622. }
  623. res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
  624. }
  625. return res
  626. }
  627. type MockDescribeConfigsResponse struct {
  628. t TestReporter
  629. }
  630. func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
  631. return &MockDescribeConfigsResponse{t: t}
  632. }
  633. func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
  634. req := reqBody.(*DescribeConfigsRequest)
  635. res := &DescribeConfigsResponse{}
  636. for _, r := range req.Resources {
  637. var configEntries []*ConfigEntry
  638. switch r.Type {
  639. case TopicResource:
  640. configEntries = append(configEntries,
  641. &ConfigEntry{Name: "max.message.bytes",
  642. Value: "1000000",
  643. ReadOnly: false,
  644. Default: true,
  645. Sensitive: false,
  646. }, &ConfigEntry{Name: "retention.ms",
  647. Value: "5000",
  648. ReadOnly: false,
  649. Default: false,
  650. Sensitive: false,
  651. }, &ConfigEntry{Name: "password",
  652. Value: "12345",
  653. ReadOnly: false,
  654. Default: false,
  655. Sensitive: true,
  656. })
  657. res.Resources = append(res.Resources, &ResourceResponse{
  658. Name: r.Name,
  659. Configs: configEntries,
  660. })
  661. }
  662. }
  663. return res
  664. }
  665. type MockAlterConfigsResponse struct {
  666. t TestReporter
  667. }
  668. func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
  669. return &MockAlterConfigsResponse{t: t}
  670. }
  671. func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
  672. req := reqBody.(*AlterConfigsRequest)
  673. res := &AlterConfigsResponse{}
  674. for _, r := range req.Resources {
  675. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
  676. Type: TopicResource,
  677. ErrorMsg: "",
  678. })
  679. }
  680. return res
  681. }
  682. type MockCreateAclsResponse struct {
  683. t TestReporter
  684. }
  685. func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
  686. return &MockCreateAclsResponse{t: t}
  687. }
  688. func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
  689. req := reqBody.(*CreateAclsRequest)
  690. res := &CreateAclsResponse{}
  691. for range req.AclCreations {
  692. res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
  693. }
  694. return res
  695. }
  696. type MockListAclsResponse struct {
  697. t TestReporter
  698. }
  699. func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
  700. return &MockListAclsResponse{t: t}
  701. }
  702. func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
  703. req := reqBody.(*DescribeAclsRequest)
  704. res := &DescribeAclsResponse{}
  705. res.Err = ErrNoError
  706. acl := &ResourceAcls{}
  707. acl.Resource.ResourceName = *req.ResourceName
  708. acl.Resource.ResourceType = req.ResourceType
  709. acl.Acls = append(acl.Acls, &Acl{})
  710. res.ResourceAcls = append(res.ResourceAcls, acl)
  711. return res
  712. }
  713. type MockSaslAuthenticateResponse struct {
  714. t TestReporter
  715. kerror KError
  716. saslAuthBytes []byte
  717. }
  718. func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
  719. return &MockSaslAuthenticateResponse{t: t}
  720. }
  721. func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
  722. res := &SaslAuthenticateResponse{}
  723. res.Err = msar.kerror
  724. res.SaslAuthBytes = msar.saslAuthBytes
  725. return res
  726. }
  727. func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
  728. msar.kerror = kerror
  729. return msar
  730. }
  731. func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
  732. msar.saslAuthBytes = saslAuthBytes
  733. return msar
  734. }
  735. type MockDeleteAclsResponse struct {
  736. t TestReporter
  737. }
  738. type MockSaslHandshakeResponse struct {
  739. enabledMechanisms []string
  740. kerror KError
  741. t TestReporter
  742. }
  743. func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
  744. return &MockSaslHandshakeResponse{t: t}
  745. }
  746. func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
  747. res := &SaslHandshakeResponse{}
  748. res.Err = mshr.kerror
  749. res.EnabledMechanisms = mshr.enabledMechanisms
  750. return res
  751. }
  752. func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
  753. mshr.kerror = kerror
  754. return mshr
  755. }
  756. func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
  757. mshr.enabledMechanisms = enabledMechanisms
  758. return mshr
  759. }
  760. func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
  761. return &MockDeleteAclsResponse{t: t}
  762. }
  763. func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
  764. req := reqBody.(*DeleteAclsRequest)
  765. res := &DeleteAclsResponse{}
  766. for range req.Filters {
  767. response := &FilterResponse{Err: ErrNoError}
  768. response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
  769. res.FilterResponses = append(res.FilterResponses, response)
  770. }
  771. return res
  772. }