mockresponses.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994
  1. package sarama
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. // TestReporter has methods matching go's testing.T to avoid importing
  7. // `testing` in the main part of the library.
  8. type TestReporter interface {
  9. Error(...interface{})
  10. Errorf(string, ...interface{})
  11. Fatal(...interface{})
  12. Fatalf(string, ...interface{})
  13. }
  14. // MockResponse is a response builder interface it defines one method that
  15. // allows generating a response based on a request body. MockResponses are used
  16. // to program behavior of MockBroker in tests.
  17. type MockResponse interface {
  18. For(reqBody versionedDecoder) (res encoder)
  19. }
  20. // MockWrapper is a mock response builder that returns a particular concrete
  21. // response regardless of the actual request passed to the `For` method.
  22. type MockWrapper struct {
  23. res encoder
  24. }
  25. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
  26. return mw.res
  27. }
  28. func NewMockWrapper(res encoder) *MockWrapper {
  29. return &MockWrapper{res: res}
  30. }
  31. // MockSequence is a mock response builder that is created from a sequence of
  32. // concrete responses. Every time when a `MockBroker` calls its `For` method
  33. // the next response from the sequence is returned. When the end of the
  34. // sequence is reached the last element from the sequence is returned.
  35. type MockSequence struct {
  36. responses []MockResponse
  37. }
  38. func NewMockSequence(responses ...interface{}) *MockSequence {
  39. ms := &MockSequence{}
  40. ms.responses = make([]MockResponse, len(responses))
  41. for i, res := range responses {
  42. switch res := res.(type) {
  43. case MockResponse:
  44. ms.responses[i] = res
  45. case encoder:
  46. ms.responses[i] = NewMockWrapper(res)
  47. default:
  48. panic(fmt.Sprintf("Unexpected response type: %T", res))
  49. }
  50. }
  51. return ms
  52. }
  53. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
  54. res = mc.responses[0].For(reqBody)
  55. if len(mc.responses) > 1 {
  56. mc.responses = mc.responses[1:]
  57. }
  58. return res
  59. }
  60. type MockListGroupsResponse struct {
  61. groups map[string]string
  62. t TestReporter
  63. }
  64. func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
  65. return &MockListGroupsResponse{
  66. groups: make(map[string]string),
  67. t: t,
  68. }
  69. }
  70. func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
  71. request := reqBody.(*ListGroupsRequest)
  72. _ = request
  73. response := &ListGroupsResponse{
  74. Groups: m.groups,
  75. }
  76. return response
  77. }
  78. func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
  79. m.groups[groupID] = protocolType
  80. return m
  81. }
  82. type MockDescribeGroupsResponse struct {
  83. groups map[string]*GroupDescription
  84. t TestReporter
  85. }
  86. func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
  87. return &MockDescribeGroupsResponse{
  88. t: t,
  89. groups: make(map[string]*GroupDescription),
  90. }
  91. }
  92. func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
  93. m.groups[groupID] = description
  94. return m
  95. }
  96. func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
  97. request := reqBody.(*DescribeGroupsRequest)
  98. response := &DescribeGroupsResponse{}
  99. for _, requestedGroup := range request.Groups {
  100. if group, ok := m.groups[requestedGroup]; ok {
  101. response.Groups = append(response.Groups, group)
  102. } else {
  103. // Mimic real kafka - if a group doesn't exist, return
  104. // an entry with state "Dead"
  105. response.Groups = append(response.Groups, &GroupDescription{
  106. GroupId: requestedGroup,
  107. State: "Dead",
  108. })
  109. }
  110. }
  111. return response
  112. }
  113. // MockMetadataResponse is a `MetadataResponse` builder.
  114. type MockMetadataResponse struct {
  115. controllerID int32
  116. leaders map[string]map[int32]int32
  117. brokers map[string]int32
  118. t TestReporter
  119. }
  120. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  121. return &MockMetadataResponse{
  122. leaders: make(map[string]map[int32]int32),
  123. brokers: make(map[string]int32),
  124. t: t,
  125. }
  126. }
  127. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  128. partitions := mmr.leaders[topic]
  129. if partitions == nil {
  130. partitions = make(map[int32]int32)
  131. mmr.leaders[topic] = partitions
  132. }
  133. partitions[partition] = brokerID
  134. return mmr
  135. }
  136. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  137. mmr.brokers[addr] = brokerID
  138. return mmr
  139. }
  140. func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
  141. mmr.controllerID = brokerID
  142. return mmr
  143. }
  144. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
  145. metadataRequest := reqBody.(*MetadataRequest)
  146. metadataResponse := &MetadataResponse{
  147. Version: metadataRequest.version(),
  148. ControllerID: mmr.controllerID,
  149. }
  150. for addr, brokerID := range mmr.brokers {
  151. metadataResponse.AddBroker(addr, brokerID)
  152. }
  153. // Generate set of replicas
  154. replicas := []int32{}
  155. offlineReplicas := []int32{}
  156. for _, brokerID := range mmr.brokers {
  157. replicas = append(replicas, brokerID)
  158. }
  159. if len(metadataRequest.Topics) == 0 {
  160. for topic, partitions := range mmr.leaders {
  161. for partition, brokerID := range partitions {
  162. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  163. }
  164. }
  165. return metadataResponse
  166. }
  167. for _, topic := range metadataRequest.Topics {
  168. for partition, brokerID := range mmr.leaders[topic] {
  169. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  170. }
  171. }
  172. return metadataResponse
  173. }
  174. // MockOffsetResponse is an `OffsetResponse` builder.
  175. type MockOffsetResponse struct {
  176. offsets map[string]map[int32]map[int64]int64
  177. t TestReporter
  178. version int16
  179. }
  180. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  181. return &MockOffsetResponse{
  182. offsets: make(map[string]map[int32]map[int64]int64),
  183. t: t,
  184. }
  185. }
  186. func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
  187. mor.version = version
  188. return mor
  189. }
  190. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  191. partitions := mor.offsets[topic]
  192. if partitions == nil {
  193. partitions = make(map[int32]map[int64]int64)
  194. mor.offsets[topic] = partitions
  195. }
  196. times := partitions[partition]
  197. if times == nil {
  198. times = make(map[int64]int64)
  199. partitions[partition] = times
  200. }
  201. times[time] = offset
  202. return mor
  203. }
  204. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
  205. offsetRequest := reqBody.(*OffsetRequest)
  206. offsetResponse := &OffsetResponse{Version: mor.version}
  207. for topic, partitions := range offsetRequest.blocks {
  208. for partition, block := range partitions {
  209. offset := mor.getOffset(topic, partition, block.time)
  210. offsetResponse.AddTopicPartition(topic, partition, offset)
  211. }
  212. }
  213. return offsetResponse
  214. }
  215. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  216. partitions := mor.offsets[topic]
  217. if partitions == nil {
  218. mor.t.Errorf("missing topic: %s", topic)
  219. }
  220. times := partitions[partition]
  221. if times == nil {
  222. mor.t.Errorf("missing partition: %d", partition)
  223. }
  224. offset, ok := times[time]
  225. if !ok {
  226. mor.t.Errorf("missing time: %d", time)
  227. }
  228. return offset
  229. }
  230. // MockFetchResponse is a `FetchResponse` builder.
  231. type MockFetchResponse struct {
  232. messages map[string]map[int32]map[int64]Encoder
  233. highWaterMarks map[string]map[int32]int64
  234. t TestReporter
  235. batchSize int
  236. version int16
  237. }
  238. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  239. return &MockFetchResponse{
  240. messages: make(map[string]map[int32]map[int64]Encoder),
  241. highWaterMarks: make(map[string]map[int32]int64),
  242. t: t,
  243. batchSize: batchSize,
  244. }
  245. }
  246. func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
  247. mfr.version = version
  248. return mfr
  249. }
  250. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  251. partitions := mfr.messages[topic]
  252. if partitions == nil {
  253. partitions = make(map[int32]map[int64]Encoder)
  254. mfr.messages[topic] = partitions
  255. }
  256. messages := partitions[partition]
  257. if messages == nil {
  258. messages = make(map[int64]Encoder)
  259. partitions[partition] = messages
  260. }
  261. messages[offset] = msg
  262. return mfr
  263. }
  264. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  265. partitions := mfr.highWaterMarks[topic]
  266. if partitions == nil {
  267. partitions = make(map[int32]int64)
  268. mfr.highWaterMarks[topic] = partitions
  269. }
  270. partitions[partition] = offset
  271. return mfr
  272. }
  273. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
  274. fetchRequest := reqBody.(*FetchRequest)
  275. res := &FetchResponse{
  276. Version: mfr.version,
  277. }
  278. for topic, partitions := range fetchRequest.blocks {
  279. for partition, block := range partitions {
  280. initialOffset := block.fetchOffset
  281. offset := initialOffset
  282. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  283. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  284. msg := mfr.getMessage(topic, partition, offset)
  285. if msg != nil {
  286. res.AddMessage(topic, partition, nil, msg, offset)
  287. i++
  288. }
  289. offset++
  290. }
  291. fb := res.GetBlock(topic, partition)
  292. if fb == nil {
  293. res.AddError(topic, partition, ErrNoError)
  294. fb = res.GetBlock(topic, partition)
  295. }
  296. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  297. }
  298. }
  299. return res
  300. }
  301. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  302. partitions := mfr.messages[topic]
  303. if partitions == nil {
  304. return nil
  305. }
  306. messages := partitions[partition]
  307. if messages == nil {
  308. return nil
  309. }
  310. return messages[offset]
  311. }
  312. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  313. partitions := mfr.messages[topic]
  314. if partitions == nil {
  315. return 0
  316. }
  317. messages := partitions[partition]
  318. if messages == nil {
  319. return 0
  320. }
  321. return len(messages)
  322. }
  323. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  324. partitions := mfr.highWaterMarks[topic]
  325. if partitions == nil {
  326. return 0
  327. }
  328. return partitions[partition]
  329. }
  330. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  331. type MockConsumerMetadataResponse struct {
  332. coordinators map[string]interface{}
  333. t TestReporter
  334. }
  335. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  336. return &MockConsumerMetadataResponse{
  337. coordinators: make(map[string]interface{}),
  338. t: t,
  339. }
  340. }
  341. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  342. mr.coordinators[group] = broker
  343. return mr
  344. }
  345. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  346. mr.coordinators[group] = kerror
  347. return mr
  348. }
  349. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
  350. req := reqBody.(*ConsumerMetadataRequest)
  351. group := req.ConsumerGroup
  352. res := &ConsumerMetadataResponse{}
  353. v := mr.coordinators[group]
  354. switch v := v.(type) {
  355. case *MockBroker:
  356. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  357. case KError:
  358. res.Err = v
  359. }
  360. return res
  361. }
  362. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  363. type MockFindCoordinatorResponse struct {
  364. groupCoordinators map[string]interface{}
  365. transCoordinators map[string]interface{}
  366. t TestReporter
  367. }
  368. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  369. return &MockFindCoordinatorResponse{
  370. groupCoordinators: make(map[string]interface{}),
  371. transCoordinators: make(map[string]interface{}),
  372. t: t,
  373. }
  374. }
  375. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  376. switch coordinatorType {
  377. case CoordinatorGroup:
  378. mr.groupCoordinators[group] = broker
  379. case CoordinatorTransaction:
  380. mr.transCoordinators[group] = broker
  381. }
  382. return mr
  383. }
  384. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  385. switch coordinatorType {
  386. case CoordinatorGroup:
  387. mr.groupCoordinators[group] = kerror
  388. case CoordinatorTransaction:
  389. mr.transCoordinators[group] = kerror
  390. }
  391. return mr
  392. }
  393. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
  394. req := reqBody.(*FindCoordinatorRequest)
  395. res := &FindCoordinatorResponse{}
  396. var v interface{}
  397. switch req.CoordinatorType {
  398. case CoordinatorGroup:
  399. v = mr.groupCoordinators[req.CoordinatorKey]
  400. case CoordinatorTransaction:
  401. v = mr.transCoordinators[req.CoordinatorKey]
  402. }
  403. switch v := v.(type) {
  404. case *MockBroker:
  405. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  406. case KError:
  407. res.Err = v
  408. }
  409. return res
  410. }
  411. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  412. type MockOffsetCommitResponse struct {
  413. errors map[string]map[string]map[int32]KError
  414. t TestReporter
  415. }
  416. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  417. return &MockOffsetCommitResponse{t: t}
  418. }
  419. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  420. if mr.errors == nil {
  421. mr.errors = make(map[string]map[string]map[int32]KError)
  422. }
  423. topics := mr.errors[group]
  424. if topics == nil {
  425. topics = make(map[string]map[int32]KError)
  426. mr.errors[group] = topics
  427. }
  428. partitions := topics[topic]
  429. if partitions == nil {
  430. partitions = make(map[int32]KError)
  431. topics[topic] = partitions
  432. }
  433. partitions[partition] = kerror
  434. return mr
  435. }
  436. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
  437. req := reqBody.(*OffsetCommitRequest)
  438. group := req.ConsumerGroup
  439. res := &OffsetCommitResponse{}
  440. for topic, partitions := range req.blocks {
  441. for partition := range partitions {
  442. res.AddError(topic, partition, mr.getError(group, topic, partition))
  443. }
  444. }
  445. return res
  446. }
  447. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  448. topics := mr.errors[group]
  449. if topics == nil {
  450. return ErrNoError
  451. }
  452. partitions := topics[topic]
  453. if partitions == nil {
  454. return ErrNoError
  455. }
  456. kerror, ok := partitions[partition]
  457. if !ok {
  458. return ErrNoError
  459. }
  460. return kerror
  461. }
  462. // MockProduceResponse is a `ProduceResponse` builder.
  463. type MockProduceResponse struct {
  464. version int16
  465. errors map[string]map[int32]KError
  466. t TestReporter
  467. }
  468. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  469. return &MockProduceResponse{t: t}
  470. }
  471. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  472. mr.version = version
  473. return mr
  474. }
  475. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  476. if mr.errors == nil {
  477. mr.errors = make(map[string]map[int32]KError)
  478. }
  479. partitions := mr.errors[topic]
  480. if partitions == nil {
  481. partitions = make(map[int32]KError)
  482. mr.errors[topic] = partitions
  483. }
  484. partitions[partition] = kerror
  485. return mr
  486. }
  487. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
  488. req := reqBody.(*ProduceRequest)
  489. res := &ProduceResponse{
  490. Version: mr.version,
  491. }
  492. for topic, partitions := range req.records {
  493. for partition := range partitions {
  494. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  495. }
  496. }
  497. return res
  498. }
  499. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  500. partitions := mr.errors[topic]
  501. if partitions == nil {
  502. return ErrNoError
  503. }
  504. kerror, ok := partitions[partition]
  505. if !ok {
  506. return ErrNoError
  507. }
  508. return kerror
  509. }
  510. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  511. type MockOffsetFetchResponse struct {
  512. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  513. error KError
  514. t TestReporter
  515. }
  516. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  517. return &MockOffsetFetchResponse{t: t}
  518. }
  519. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  520. if mr.offsets == nil {
  521. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  522. }
  523. topics := mr.offsets[group]
  524. if topics == nil {
  525. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  526. mr.offsets[group] = topics
  527. }
  528. partitions := topics[topic]
  529. if partitions == nil {
  530. partitions = make(map[int32]*OffsetFetchResponseBlock)
  531. topics[topic] = partitions
  532. }
  533. partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
  534. return mr
  535. }
  536. func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
  537. mr.error = kerror
  538. return mr
  539. }
  540. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
  541. req := reqBody.(*OffsetFetchRequest)
  542. group := req.ConsumerGroup
  543. res := &OffsetFetchResponse{Version: req.Version}
  544. for topic, partitions := range mr.offsets[group] {
  545. for partition, block := range partitions {
  546. res.AddBlock(topic, partition, block)
  547. }
  548. }
  549. if res.Version >= 2 {
  550. res.Err = mr.error
  551. }
  552. return res
  553. }
  554. type MockCreateTopicsResponse struct {
  555. t TestReporter
  556. }
  557. func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
  558. return &MockCreateTopicsResponse{t: t}
  559. }
  560. func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
  561. req := reqBody.(*CreateTopicsRequest)
  562. res := &CreateTopicsResponse{
  563. Version: req.Version,
  564. }
  565. res.TopicErrors = make(map[string]*TopicError)
  566. for topic := range req.TopicDetails {
  567. if res.Version >= 1 && strings.HasPrefix(topic, "_") {
  568. msg := "insufficient permissions to create topic with reserved prefix"
  569. res.TopicErrors[topic] = &TopicError{
  570. Err: ErrTopicAuthorizationFailed,
  571. ErrMsg: &msg,
  572. }
  573. continue
  574. }
  575. res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
  576. }
  577. return res
  578. }
  579. type MockDeleteTopicsResponse struct {
  580. t TestReporter
  581. }
  582. func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
  583. return &MockDeleteTopicsResponse{t: t}
  584. }
  585. func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
  586. req := reqBody.(*DeleteTopicsRequest)
  587. res := &DeleteTopicsResponse{}
  588. res.TopicErrorCodes = make(map[string]KError)
  589. for _, topic := range req.Topics {
  590. res.TopicErrorCodes[topic] = ErrNoError
  591. }
  592. res.Version = req.Version
  593. return res
  594. }
  595. type MockCreatePartitionsResponse struct {
  596. t TestReporter
  597. }
  598. func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
  599. return &MockCreatePartitionsResponse{t: t}
  600. }
  601. func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
  602. req := reqBody.(*CreatePartitionsRequest)
  603. res := &CreatePartitionsResponse{}
  604. res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
  605. for topic := range req.TopicPartitions {
  606. if strings.HasPrefix(topic, "_") {
  607. msg := "insufficient permissions to create partition on topic with reserved prefix"
  608. res.TopicPartitionErrors[topic] = &TopicPartitionError{
  609. Err: ErrTopicAuthorizationFailed,
  610. ErrMsg: &msg,
  611. }
  612. continue
  613. }
  614. res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
  615. }
  616. return res
  617. }
  618. type MockDeleteRecordsResponse struct {
  619. t TestReporter
  620. }
  621. func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
  622. return &MockDeleteRecordsResponse{t: t}
  623. }
  624. func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
  625. req := reqBody.(*DeleteRecordsRequest)
  626. res := &DeleteRecordsResponse{}
  627. res.Topics = make(map[string]*DeleteRecordsResponseTopic)
  628. for topic, deleteRecordRequestTopic := range req.Topics {
  629. partitions := make(map[int32]*DeleteRecordsResponsePartition)
  630. for partition := range deleteRecordRequestTopic.PartitionOffsets {
  631. partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
  632. }
  633. res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
  634. }
  635. return res
  636. }
  637. type MockDescribeConfigsResponse struct {
  638. t TestReporter
  639. }
  640. func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
  641. return &MockDescribeConfigsResponse{t: t}
  642. }
  643. func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
  644. req := reqBody.(*DescribeConfigsRequest)
  645. res := &DescribeConfigsResponse{
  646. Version: req.Version,
  647. }
  648. includeSynonyms := (req.Version > 0)
  649. includeSource := (req.Version > 0)
  650. for _, r := range req.Resources {
  651. var configEntries []*ConfigEntry
  652. switch r.Type {
  653. case BrokerResource:
  654. configEntries = append(configEntries,
  655. &ConfigEntry{
  656. Name: "min.insync.replicas",
  657. Value: "2",
  658. ReadOnly: false,
  659. Default: false,
  660. },
  661. )
  662. res.Resources = append(res.Resources, &ResourceResponse{
  663. Name: r.Name,
  664. Configs: configEntries,
  665. })
  666. case BrokerLoggerResource:
  667. configEntries = append(configEntries,
  668. &ConfigEntry{
  669. Name: "kafka.controller.KafkaController",
  670. Value: "DEBUG",
  671. ReadOnly: false,
  672. Default: false,
  673. },
  674. )
  675. res.Resources = append(res.Resources, &ResourceResponse{
  676. Name: r.Name,
  677. Configs: configEntries,
  678. })
  679. case TopicResource:
  680. maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
  681. Value: "1000000",
  682. ReadOnly: false,
  683. Default: !includeSource,
  684. Sensitive: false,
  685. }
  686. if includeSource {
  687. maxMessageBytes.Source = SourceDefault
  688. }
  689. if includeSynonyms {
  690. maxMessageBytes.Synonyms = []*ConfigSynonym{
  691. {
  692. ConfigName: "max.message.bytes",
  693. ConfigValue: "500000",
  694. },
  695. }
  696. }
  697. retentionMs := &ConfigEntry{Name: "retention.ms",
  698. Value: "5000",
  699. ReadOnly: false,
  700. Default: false,
  701. Sensitive: false,
  702. }
  703. if includeSynonyms {
  704. retentionMs.Synonyms = []*ConfigSynonym{
  705. {
  706. ConfigName: "log.retention.ms",
  707. ConfigValue: "2500",
  708. },
  709. }
  710. }
  711. password := &ConfigEntry{Name: "password",
  712. Value: "12345",
  713. ReadOnly: false,
  714. Default: false,
  715. Sensitive: true,
  716. }
  717. configEntries = append(
  718. configEntries, maxMessageBytes, retentionMs, password)
  719. res.Resources = append(res.Resources, &ResourceResponse{
  720. Name: r.Name,
  721. Configs: configEntries,
  722. })
  723. }
  724. }
  725. return res
  726. }
  727. type MockAlterConfigsResponse struct {
  728. t TestReporter
  729. }
  730. func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
  731. return &MockAlterConfigsResponse{t: t}
  732. }
  733. func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
  734. req := reqBody.(*AlterConfigsRequest)
  735. res := &AlterConfigsResponse{}
  736. for _, r := range req.Resources {
  737. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
  738. Type: r.Type,
  739. ErrorMsg: "",
  740. })
  741. }
  742. return res
  743. }
  744. type MockCreateAclsResponse struct {
  745. t TestReporter
  746. }
  747. func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
  748. return &MockCreateAclsResponse{t: t}
  749. }
  750. func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
  751. req := reqBody.(*CreateAclsRequest)
  752. res := &CreateAclsResponse{}
  753. for range req.AclCreations {
  754. res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
  755. }
  756. return res
  757. }
  758. type MockListAclsResponse struct {
  759. t TestReporter
  760. }
  761. func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
  762. return &MockListAclsResponse{t: t}
  763. }
  764. func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
  765. req := reqBody.(*DescribeAclsRequest)
  766. res := &DescribeAclsResponse{}
  767. res.Err = ErrNoError
  768. acl := &ResourceAcls{}
  769. if req.ResourceName != nil {
  770. acl.Resource.ResourceName = *req.ResourceName
  771. }
  772. acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
  773. acl.Resource.ResourceType = req.ResourceType
  774. host := "*"
  775. if req.Host != nil {
  776. host = *req.Host
  777. }
  778. principal := "User:test"
  779. if req.Principal != nil {
  780. principal = *req.Principal
  781. }
  782. permissionType := req.PermissionType
  783. if permissionType == AclPermissionAny {
  784. permissionType = AclPermissionAllow
  785. }
  786. acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
  787. res.ResourceAcls = append(res.ResourceAcls, acl)
  788. res.Version = int16(req.Version)
  789. return res
  790. }
  791. type MockSaslAuthenticateResponse struct {
  792. t TestReporter
  793. kerror KError
  794. saslAuthBytes []byte
  795. }
  796. func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
  797. return &MockSaslAuthenticateResponse{t: t}
  798. }
  799. func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
  800. res := &SaslAuthenticateResponse{}
  801. res.Err = msar.kerror
  802. res.SaslAuthBytes = msar.saslAuthBytes
  803. return res
  804. }
  805. func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
  806. msar.kerror = kerror
  807. return msar
  808. }
  809. func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
  810. msar.saslAuthBytes = saslAuthBytes
  811. return msar
  812. }
  813. type MockDeleteAclsResponse struct {
  814. t TestReporter
  815. }
  816. type MockSaslHandshakeResponse struct {
  817. enabledMechanisms []string
  818. kerror KError
  819. t TestReporter
  820. }
  821. func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
  822. return &MockSaslHandshakeResponse{t: t}
  823. }
  824. func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
  825. res := &SaslHandshakeResponse{}
  826. res.Err = mshr.kerror
  827. res.EnabledMechanisms = mshr.enabledMechanisms
  828. return res
  829. }
  830. func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
  831. mshr.kerror = kerror
  832. return mshr
  833. }
  834. func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
  835. mshr.enabledMechanisms = enabledMechanisms
  836. return mshr
  837. }
  838. func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
  839. return &MockDeleteAclsResponse{t: t}
  840. }
  841. func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
  842. req := reqBody.(*DeleteAclsRequest)
  843. res := &DeleteAclsResponse{}
  844. for range req.Filters {
  845. response := &FilterResponse{Err: ErrNoError}
  846. response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
  847. res.FilterResponses = append(res.FilterResponses, response)
  848. }
  849. res.Version = int16(req.Version)
  850. return res
  851. }
  852. type MockDeleteGroupsResponse struct {
  853. deletedGroups []string
  854. }
  855. func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
  856. return &MockDeleteGroupsResponse{}
  857. }
  858. func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
  859. m.deletedGroups = groups
  860. return m
  861. }
  862. func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
  863. resp := &DeleteGroupsResponse{
  864. GroupErrorCodes: map[string]KError{},
  865. }
  866. for _, group := range m.deletedGroups {
  867. resp.GroupErrorCodes[group] = ErrNoError
  868. }
  869. return resp
  870. }