mockresponses.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121
  1. package sarama
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. // TestReporter has methods matching go's testing.T to avoid importing
  7. // `testing` in the main part of the library.
  8. type TestReporter interface {
  9. Error(...interface{})
  10. Errorf(string, ...interface{})
  11. Fatal(...interface{})
  12. Fatalf(string, ...interface{})
  13. }
  14. // MockResponse is a response builder interface it defines one method that
  15. // allows generating a response based on a request body. MockResponses are used
  16. // to program behavior of MockBroker in tests.
  17. type MockResponse interface {
  18. For(reqBody versionedDecoder) (res encoderWithHeader)
  19. }
  20. // MockWrapper is a mock response builder that returns a particular concrete
  21. // response regardless of the actual request passed to the `For` method.
  22. type MockWrapper struct {
  23. res encoderWithHeader
  24. }
  25. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
  26. return mw.res
  27. }
  28. func NewMockWrapper(res encoderWithHeader) *MockWrapper {
  29. return &MockWrapper{res: res}
  30. }
  31. // MockSequence is a mock response builder that is created from a sequence of
  32. // concrete responses. Every time when a `MockBroker` calls its `For` method
  33. // the next response from the sequence is returned. When the end of the
  34. // sequence is reached the last element from the sequence is returned.
  35. type MockSequence struct {
  36. responses []MockResponse
  37. }
  38. func NewMockSequence(responses ...interface{}) *MockSequence {
  39. ms := &MockSequence{}
  40. ms.responses = make([]MockResponse, len(responses))
  41. for i, res := range responses {
  42. switch res := res.(type) {
  43. case MockResponse:
  44. ms.responses[i] = res
  45. case encoderWithHeader:
  46. ms.responses[i] = NewMockWrapper(res)
  47. default:
  48. panic(fmt.Sprintf("Unexpected response type: %T", res))
  49. }
  50. }
  51. return ms
  52. }
  53. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
  54. res = mc.responses[0].For(reqBody)
  55. if len(mc.responses) > 1 {
  56. mc.responses = mc.responses[1:]
  57. }
  58. return res
  59. }
  60. type MockListGroupsResponse struct {
  61. groups map[string]string
  62. t TestReporter
  63. }
  64. func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
  65. return &MockListGroupsResponse{
  66. groups: make(map[string]string),
  67. t: t,
  68. }
  69. }
  70. func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  71. request := reqBody.(*ListGroupsRequest)
  72. _ = request
  73. response := &ListGroupsResponse{
  74. Groups: m.groups,
  75. }
  76. return response
  77. }
  78. func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
  79. m.groups[groupID] = protocolType
  80. return m
  81. }
  82. type MockDescribeGroupsResponse struct {
  83. groups map[string]*GroupDescription
  84. t TestReporter
  85. }
  86. func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
  87. return &MockDescribeGroupsResponse{
  88. t: t,
  89. groups: make(map[string]*GroupDescription),
  90. }
  91. }
  92. func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
  93. m.groups[groupID] = description
  94. return m
  95. }
  96. func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  97. request := reqBody.(*DescribeGroupsRequest)
  98. response := &DescribeGroupsResponse{}
  99. for _, requestedGroup := range request.Groups {
  100. if group, ok := m.groups[requestedGroup]; ok {
  101. response.Groups = append(response.Groups, group)
  102. } else {
  103. // Mimic real kafka - if a group doesn't exist, return
  104. // an entry with state "Dead"
  105. response.Groups = append(response.Groups, &GroupDescription{
  106. GroupId: requestedGroup,
  107. State: "Dead",
  108. })
  109. }
  110. }
  111. return response
  112. }
  113. // MockMetadataResponse is a `MetadataResponse` builder.
  114. type MockMetadataResponse struct {
  115. controllerID int32
  116. leaders map[string]map[int32]int32
  117. brokers map[string]int32
  118. t TestReporter
  119. }
  120. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  121. return &MockMetadataResponse{
  122. leaders: make(map[string]map[int32]int32),
  123. brokers: make(map[string]int32),
  124. t: t,
  125. }
  126. }
  127. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  128. partitions := mmr.leaders[topic]
  129. if partitions == nil {
  130. partitions = make(map[int32]int32)
  131. mmr.leaders[topic] = partitions
  132. }
  133. partitions[partition] = brokerID
  134. return mmr
  135. }
  136. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  137. mmr.brokers[addr] = brokerID
  138. return mmr
  139. }
  140. func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
  141. mmr.controllerID = brokerID
  142. return mmr
  143. }
  144. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
  145. metadataRequest := reqBody.(*MetadataRequest)
  146. metadataResponse := &MetadataResponse{
  147. Version: metadataRequest.version(),
  148. ControllerID: mmr.controllerID,
  149. }
  150. for addr, brokerID := range mmr.brokers {
  151. metadataResponse.AddBroker(addr, brokerID)
  152. }
  153. // Generate set of replicas
  154. var replicas []int32
  155. var offlineReplicas []int32
  156. for _, brokerID := range mmr.brokers {
  157. replicas = append(replicas, brokerID)
  158. }
  159. if len(metadataRequest.Topics) == 0 {
  160. for topic, partitions := range mmr.leaders {
  161. for partition, brokerID := range partitions {
  162. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  163. }
  164. }
  165. return metadataResponse
  166. }
  167. for _, topic := range metadataRequest.Topics {
  168. for partition, brokerID := range mmr.leaders[topic] {
  169. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  170. }
  171. }
  172. return metadataResponse
  173. }
  174. // MockOffsetResponse is an `OffsetResponse` builder.
  175. type MockOffsetResponse struct {
  176. offsets map[string]map[int32]map[int64]int64
  177. t TestReporter
  178. version int16
  179. }
  180. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  181. return &MockOffsetResponse{
  182. offsets: make(map[string]map[int32]map[int64]int64),
  183. t: t,
  184. }
  185. }
  186. func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
  187. mor.version = version
  188. return mor
  189. }
  190. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  191. partitions := mor.offsets[topic]
  192. if partitions == nil {
  193. partitions = make(map[int32]map[int64]int64)
  194. mor.offsets[topic] = partitions
  195. }
  196. times := partitions[partition]
  197. if times == nil {
  198. times = make(map[int64]int64)
  199. partitions[partition] = times
  200. }
  201. times[time] = offset
  202. return mor
  203. }
  204. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
  205. offsetRequest := reqBody.(*OffsetRequest)
  206. offsetResponse := &OffsetResponse{Version: mor.version}
  207. for topic, partitions := range offsetRequest.blocks {
  208. for partition, block := range partitions {
  209. offset := mor.getOffset(topic, partition, block.time)
  210. offsetResponse.AddTopicPartition(topic, partition, offset)
  211. }
  212. }
  213. return offsetResponse
  214. }
  215. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  216. partitions := mor.offsets[topic]
  217. if partitions == nil {
  218. mor.t.Errorf("missing topic: %s", topic)
  219. }
  220. times := partitions[partition]
  221. if times == nil {
  222. mor.t.Errorf("missing partition: %d", partition)
  223. }
  224. offset, ok := times[time]
  225. if !ok {
  226. mor.t.Errorf("missing time: %d", time)
  227. }
  228. return offset
  229. }
  230. // MockFetchResponse is a `FetchResponse` builder.
  231. type MockFetchResponse struct {
  232. messages map[string]map[int32]map[int64]Encoder
  233. highWaterMarks map[string]map[int32]int64
  234. t TestReporter
  235. batchSize int
  236. version int16
  237. }
  238. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  239. return &MockFetchResponse{
  240. messages: make(map[string]map[int32]map[int64]Encoder),
  241. highWaterMarks: make(map[string]map[int32]int64),
  242. t: t,
  243. batchSize: batchSize,
  244. }
  245. }
  246. func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
  247. mfr.version = version
  248. return mfr
  249. }
  250. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  251. partitions := mfr.messages[topic]
  252. if partitions == nil {
  253. partitions = make(map[int32]map[int64]Encoder)
  254. mfr.messages[topic] = partitions
  255. }
  256. messages := partitions[partition]
  257. if messages == nil {
  258. messages = make(map[int64]Encoder)
  259. partitions[partition] = messages
  260. }
  261. messages[offset] = msg
  262. return mfr
  263. }
  264. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  265. partitions := mfr.highWaterMarks[topic]
  266. if partitions == nil {
  267. partitions = make(map[int32]int64)
  268. mfr.highWaterMarks[topic] = partitions
  269. }
  270. partitions[partition] = offset
  271. return mfr
  272. }
  273. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
  274. fetchRequest := reqBody.(*FetchRequest)
  275. res := &FetchResponse{
  276. Version: mfr.version,
  277. }
  278. for topic, partitions := range fetchRequest.blocks {
  279. for partition, block := range partitions {
  280. initialOffset := block.fetchOffset
  281. offset := initialOffset
  282. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  283. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  284. msg := mfr.getMessage(topic, partition, offset)
  285. if msg != nil {
  286. res.AddMessage(topic, partition, nil, msg, offset)
  287. i++
  288. }
  289. offset++
  290. }
  291. fb := res.GetBlock(topic, partition)
  292. if fb == nil {
  293. res.AddError(topic, partition, ErrNoError)
  294. fb = res.GetBlock(topic, partition)
  295. }
  296. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  297. }
  298. }
  299. return res
  300. }
  301. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  302. partitions := mfr.messages[topic]
  303. if partitions == nil {
  304. return nil
  305. }
  306. messages := partitions[partition]
  307. if messages == nil {
  308. return nil
  309. }
  310. return messages[offset]
  311. }
  312. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  313. partitions := mfr.messages[topic]
  314. if partitions == nil {
  315. return 0
  316. }
  317. messages := partitions[partition]
  318. if messages == nil {
  319. return 0
  320. }
  321. return len(messages)
  322. }
  323. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  324. partitions := mfr.highWaterMarks[topic]
  325. if partitions == nil {
  326. return 0
  327. }
  328. return partitions[partition]
  329. }
  330. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  331. type MockConsumerMetadataResponse struct {
  332. coordinators map[string]interface{}
  333. t TestReporter
  334. }
  335. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  336. return &MockConsumerMetadataResponse{
  337. coordinators: make(map[string]interface{}),
  338. t: t,
  339. }
  340. }
  341. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  342. mr.coordinators[group] = broker
  343. return mr
  344. }
  345. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  346. mr.coordinators[group] = kerror
  347. return mr
  348. }
  349. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
  350. req := reqBody.(*ConsumerMetadataRequest)
  351. group := req.ConsumerGroup
  352. res := &ConsumerMetadataResponse{}
  353. v := mr.coordinators[group]
  354. switch v := v.(type) {
  355. case *MockBroker:
  356. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  357. case KError:
  358. res.Err = v
  359. }
  360. return res
  361. }
  362. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  363. type MockFindCoordinatorResponse struct {
  364. groupCoordinators map[string]interface{}
  365. transCoordinators map[string]interface{}
  366. t TestReporter
  367. }
  368. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  369. return &MockFindCoordinatorResponse{
  370. groupCoordinators: make(map[string]interface{}),
  371. transCoordinators: make(map[string]interface{}),
  372. t: t,
  373. }
  374. }
  375. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  376. switch coordinatorType {
  377. case CoordinatorGroup:
  378. mr.groupCoordinators[group] = broker
  379. case CoordinatorTransaction:
  380. mr.transCoordinators[group] = broker
  381. }
  382. return mr
  383. }
  384. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  385. switch coordinatorType {
  386. case CoordinatorGroup:
  387. mr.groupCoordinators[group] = kerror
  388. case CoordinatorTransaction:
  389. mr.transCoordinators[group] = kerror
  390. }
  391. return mr
  392. }
  393. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
  394. req := reqBody.(*FindCoordinatorRequest)
  395. res := &FindCoordinatorResponse{}
  396. var v interface{}
  397. switch req.CoordinatorType {
  398. case CoordinatorGroup:
  399. v = mr.groupCoordinators[req.CoordinatorKey]
  400. case CoordinatorTransaction:
  401. v = mr.transCoordinators[req.CoordinatorKey]
  402. }
  403. switch v := v.(type) {
  404. case *MockBroker:
  405. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  406. case KError:
  407. res.Err = v
  408. }
  409. return res
  410. }
  411. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  412. type MockOffsetCommitResponse struct {
  413. errors map[string]map[string]map[int32]KError
  414. t TestReporter
  415. }
  416. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  417. return &MockOffsetCommitResponse{t: t}
  418. }
  419. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  420. if mr.errors == nil {
  421. mr.errors = make(map[string]map[string]map[int32]KError)
  422. }
  423. topics := mr.errors[group]
  424. if topics == nil {
  425. topics = make(map[string]map[int32]KError)
  426. mr.errors[group] = topics
  427. }
  428. partitions := topics[topic]
  429. if partitions == nil {
  430. partitions = make(map[int32]KError)
  431. topics[topic] = partitions
  432. }
  433. partitions[partition] = kerror
  434. return mr
  435. }
  436. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
  437. req := reqBody.(*OffsetCommitRequest)
  438. group := req.ConsumerGroup
  439. res := &OffsetCommitResponse{}
  440. for topic, partitions := range req.blocks {
  441. for partition := range partitions {
  442. res.AddError(topic, partition, mr.getError(group, topic, partition))
  443. }
  444. }
  445. return res
  446. }
  447. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  448. topics := mr.errors[group]
  449. if topics == nil {
  450. return ErrNoError
  451. }
  452. partitions := topics[topic]
  453. if partitions == nil {
  454. return ErrNoError
  455. }
  456. kerror, ok := partitions[partition]
  457. if !ok {
  458. return ErrNoError
  459. }
  460. return kerror
  461. }
  462. // MockProduceResponse is a `ProduceResponse` builder.
  463. type MockProduceResponse struct {
  464. version int16
  465. errors map[string]map[int32]KError
  466. t TestReporter
  467. }
  468. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  469. return &MockProduceResponse{t: t}
  470. }
  471. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  472. mr.version = version
  473. return mr
  474. }
  475. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  476. if mr.errors == nil {
  477. mr.errors = make(map[string]map[int32]KError)
  478. }
  479. partitions := mr.errors[topic]
  480. if partitions == nil {
  481. partitions = make(map[int32]KError)
  482. mr.errors[topic] = partitions
  483. }
  484. partitions[partition] = kerror
  485. return mr
  486. }
  487. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
  488. req := reqBody.(*ProduceRequest)
  489. res := &ProduceResponse{
  490. Version: mr.version,
  491. }
  492. for topic, partitions := range req.records {
  493. for partition := range partitions {
  494. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  495. }
  496. }
  497. return res
  498. }
  499. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  500. partitions := mr.errors[topic]
  501. if partitions == nil {
  502. return ErrNoError
  503. }
  504. kerror, ok := partitions[partition]
  505. if !ok {
  506. return ErrNoError
  507. }
  508. return kerror
  509. }
  510. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  511. type MockOffsetFetchResponse struct {
  512. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  513. error KError
  514. t TestReporter
  515. }
  516. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  517. return &MockOffsetFetchResponse{t: t}
  518. }
  519. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  520. if mr.offsets == nil {
  521. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  522. }
  523. topics := mr.offsets[group]
  524. if topics == nil {
  525. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  526. mr.offsets[group] = topics
  527. }
  528. partitions := topics[topic]
  529. if partitions == nil {
  530. partitions = make(map[int32]*OffsetFetchResponseBlock)
  531. topics[topic] = partitions
  532. }
  533. partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
  534. return mr
  535. }
  536. func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
  537. mr.error = kerror
  538. return mr
  539. }
  540. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
  541. req := reqBody.(*OffsetFetchRequest)
  542. group := req.ConsumerGroup
  543. res := &OffsetFetchResponse{Version: req.Version}
  544. for topic, partitions := range mr.offsets[group] {
  545. for partition, block := range partitions {
  546. res.AddBlock(topic, partition, block)
  547. }
  548. }
  549. if res.Version >= 2 {
  550. res.Err = mr.error
  551. }
  552. return res
  553. }
  554. type MockCreateTopicsResponse struct {
  555. t TestReporter
  556. }
  557. func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
  558. return &MockCreateTopicsResponse{t: t}
  559. }
  560. func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  561. req := reqBody.(*CreateTopicsRequest)
  562. res := &CreateTopicsResponse{
  563. Version: req.Version,
  564. }
  565. res.TopicErrors = make(map[string]*TopicError)
  566. for topic := range req.TopicDetails {
  567. if res.Version >= 1 && strings.HasPrefix(topic, "_") {
  568. msg := "insufficient permissions to create topic with reserved prefix"
  569. res.TopicErrors[topic] = &TopicError{
  570. Err: ErrTopicAuthorizationFailed,
  571. ErrMsg: &msg,
  572. }
  573. continue
  574. }
  575. res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
  576. }
  577. return res
  578. }
  579. type MockDeleteTopicsResponse struct {
  580. t TestReporter
  581. }
  582. func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
  583. return &MockDeleteTopicsResponse{t: t}
  584. }
  585. func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  586. req := reqBody.(*DeleteTopicsRequest)
  587. res := &DeleteTopicsResponse{}
  588. res.TopicErrorCodes = make(map[string]KError)
  589. for _, topic := range req.Topics {
  590. res.TopicErrorCodes[topic] = ErrNoError
  591. }
  592. res.Version = req.Version
  593. return res
  594. }
  595. type MockCreatePartitionsResponse struct {
  596. t TestReporter
  597. }
  598. func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
  599. return &MockCreatePartitionsResponse{t: t}
  600. }
  601. func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  602. req := reqBody.(*CreatePartitionsRequest)
  603. res := &CreatePartitionsResponse{}
  604. res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
  605. for topic := range req.TopicPartitions {
  606. if strings.HasPrefix(topic, "_") {
  607. msg := "insufficient permissions to create partition on topic with reserved prefix"
  608. res.TopicPartitionErrors[topic] = &TopicPartitionError{
  609. Err: ErrTopicAuthorizationFailed,
  610. ErrMsg: &msg,
  611. }
  612. continue
  613. }
  614. res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
  615. }
  616. return res
  617. }
  618. type MockAlterPartitionReassignmentsResponse struct {
  619. t TestReporter
  620. }
  621. func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
  622. return &MockAlterPartitionReassignmentsResponse{t: t}
  623. }
  624. func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  625. req := reqBody.(*AlterPartitionReassignmentsRequest)
  626. _ = req
  627. res := &AlterPartitionReassignmentsResponse{}
  628. return res
  629. }
  630. type MockListPartitionReassignmentsResponse struct {
  631. t TestReporter
  632. }
  633. func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
  634. return &MockListPartitionReassignmentsResponse{t: t}
  635. }
  636. func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  637. req := reqBody.(*ListPartitionReassignmentsRequest)
  638. _ = req
  639. res := &ListPartitionReassignmentsResponse{}
  640. for topic, partitions := range req.blocks {
  641. for _, partition := range partitions {
  642. res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
  643. }
  644. }
  645. return res
  646. }
  647. type MockDeleteRecordsResponse struct {
  648. t TestReporter
  649. }
  650. func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
  651. return &MockDeleteRecordsResponse{t: t}
  652. }
  653. func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  654. req := reqBody.(*DeleteRecordsRequest)
  655. res := &DeleteRecordsResponse{}
  656. res.Topics = make(map[string]*DeleteRecordsResponseTopic)
  657. for topic, deleteRecordRequestTopic := range req.Topics {
  658. partitions := make(map[int32]*DeleteRecordsResponsePartition)
  659. for partition := range deleteRecordRequestTopic.PartitionOffsets {
  660. partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
  661. }
  662. res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
  663. }
  664. return res
  665. }
  666. type MockDescribeConfigsResponse struct {
  667. t TestReporter
  668. }
  669. func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
  670. return &MockDescribeConfigsResponse{t: t}
  671. }
  672. func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  673. req := reqBody.(*DescribeConfigsRequest)
  674. res := &DescribeConfigsResponse{
  675. Version: req.Version,
  676. }
  677. includeSynonyms := req.Version > 0
  678. includeSource := req.Version > 0
  679. for _, r := range req.Resources {
  680. var configEntries []*ConfigEntry
  681. switch r.Type {
  682. case BrokerResource:
  683. configEntries = append(configEntries,
  684. &ConfigEntry{
  685. Name: "min.insync.replicas",
  686. Value: "2",
  687. ReadOnly: false,
  688. Default: false,
  689. },
  690. )
  691. res.Resources = append(res.Resources, &ResourceResponse{
  692. Name: r.Name,
  693. Configs: configEntries,
  694. })
  695. case BrokerLoggerResource:
  696. configEntries = append(configEntries,
  697. &ConfigEntry{
  698. Name: "kafka.controller.KafkaController",
  699. Value: "DEBUG",
  700. ReadOnly: false,
  701. Default: false,
  702. },
  703. )
  704. res.Resources = append(res.Resources, &ResourceResponse{
  705. Name: r.Name,
  706. Configs: configEntries,
  707. })
  708. case TopicResource:
  709. maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
  710. Value: "1000000",
  711. ReadOnly: false,
  712. Default: !includeSource,
  713. Sensitive: false,
  714. }
  715. if includeSource {
  716. maxMessageBytes.Source = SourceDefault
  717. }
  718. if includeSynonyms {
  719. maxMessageBytes.Synonyms = []*ConfigSynonym{
  720. {
  721. ConfigName: "max.message.bytes",
  722. ConfigValue: "500000",
  723. },
  724. }
  725. }
  726. retentionMs := &ConfigEntry{Name: "retention.ms",
  727. Value: "5000",
  728. ReadOnly: false,
  729. Default: false,
  730. Sensitive: false,
  731. }
  732. if includeSynonyms {
  733. retentionMs.Synonyms = []*ConfigSynonym{
  734. {
  735. ConfigName: "log.retention.ms",
  736. ConfigValue: "2500",
  737. },
  738. }
  739. }
  740. password := &ConfigEntry{Name: "password",
  741. Value: "12345",
  742. ReadOnly: false,
  743. Default: false,
  744. Sensitive: true,
  745. }
  746. configEntries = append(
  747. configEntries, maxMessageBytes, retentionMs, password)
  748. res.Resources = append(res.Resources, &ResourceResponse{
  749. Name: r.Name,
  750. Configs: configEntries,
  751. })
  752. }
  753. }
  754. return res
  755. }
  756. type MockDescribeConfigsResponseWithErrorCode struct {
  757. t TestReporter
  758. }
  759. func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
  760. return &MockDescribeConfigsResponseWithErrorCode{t: t}
  761. }
  762. func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
  763. req := reqBody.(*DescribeConfigsRequest)
  764. res := &DescribeConfigsResponse{
  765. Version: req.Version,
  766. }
  767. for _, r := range req.Resources {
  768. res.Resources = append(res.Resources, &ResourceResponse{
  769. Name: r.Name,
  770. Type: r.Type,
  771. ErrorCode: 83,
  772. ErrorMsg: "",
  773. })
  774. }
  775. return res
  776. }
  777. type MockAlterConfigsResponse struct {
  778. t TestReporter
  779. }
  780. func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
  781. return &MockAlterConfigsResponse{t: t}
  782. }
  783. func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  784. req := reqBody.(*AlterConfigsRequest)
  785. res := &AlterConfigsResponse{}
  786. for _, r := range req.Resources {
  787. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
  788. Type: r.Type,
  789. ErrorMsg: "",
  790. })
  791. }
  792. return res
  793. }
  794. type MockAlterConfigsResponseWithErrorCode struct {
  795. t TestReporter
  796. }
  797. func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
  798. return &MockAlterConfigsResponseWithErrorCode{t: t}
  799. }
  800. func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
  801. req := reqBody.(*AlterConfigsRequest)
  802. res := &AlterConfigsResponse{}
  803. for _, r := range req.Resources {
  804. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
  805. Name: r.Name,
  806. Type: r.Type,
  807. ErrorCode: 83,
  808. ErrorMsg: "",
  809. })
  810. }
  811. return res
  812. }
  813. type MockCreateAclsResponse struct {
  814. t TestReporter
  815. }
  816. func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
  817. return &MockCreateAclsResponse{t: t}
  818. }
  819. func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  820. req := reqBody.(*CreateAclsRequest)
  821. res := &CreateAclsResponse{}
  822. for range req.AclCreations {
  823. res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
  824. }
  825. return res
  826. }
  827. type MockListAclsResponse struct {
  828. t TestReporter
  829. }
  830. func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
  831. return &MockListAclsResponse{t: t}
  832. }
  833. func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  834. req := reqBody.(*DescribeAclsRequest)
  835. res := &DescribeAclsResponse{}
  836. res.Err = ErrNoError
  837. acl := &ResourceAcls{}
  838. if req.ResourceName != nil {
  839. acl.Resource.ResourceName = *req.ResourceName
  840. }
  841. acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
  842. acl.Resource.ResourceType = req.ResourceType
  843. host := "*"
  844. if req.Host != nil {
  845. host = *req.Host
  846. }
  847. principal := "User:test"
  848. if req.Principal != nil {
  849. principal = *req.Principal
  850. }
  851. permissionType := req.PermissionType
  852. if permissionType == AclPermissionAny {
  853. permissionType = AclPermissionAllow
  854. }
  855. acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
  856. res.ResourceAcls = append(res.ResourceAcls, acl)
  857. res.Version = int16(req.Version)
  858. return res
  859. }
  860. type MockSaslAuthenticateResponse struct {
  861. t TestReporter
  862. kerror KError
  863. saslAuthBytes []byte
  864. }
  865. func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
  866. return &MockSaslAuthenticateResponse{t: t}
  867. }
  868. func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
  869. res := &SaslAuthenticateResponse{}
  870. res.Err = msar.kerror
  871. res.SaslAuthBytes = msar.saslAuthBytes
  872. return res
  873. }
  874. func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
  875. msar.kerror = kerror
  876. return msar
  877. }
  878. func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
  879. msar.saslAuthBytes = saslAuthBytes
  880. return msar
  881. }
  882. type MockDeleteAclsResponse struct {
  883. t TestReporter
  884. }
  885. type MockSaslHandshakeResponse struct {
  886. enabledMechanisms []string
  887. kerror KError
  888. t TestReporter
  889. }
  890. func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
  891. return &MockSaslHandshakeResponse{t: t}
  892. }
  893. func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
  894. res := &SaslHandshakeResponse{}
  895. res.Err = mshr.kerror
  896. res.EnabledMechanisms = mshr.enabledMechanisms
  897. return res
  898. }
  899. func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
  900. mshr.kerror = kerror
  901. return mshr
  902. }
  903. func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
  904. mshr.enabledMechanisms = enabledMechanisms
  905. return mshr
  906. }
  907. func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
  908. return &MockDeleteAclsResponse{t: t}
  909. }
  910. func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  911. req := reqBody.(*DeleteAclsRequest)
  912. res := &DeleteAclsResponse{}
  913. for range req.Filters {
  914. response := &FilterResponse{Err: ErrNoError}
  915. response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
  916. res.FilterResponses = append(res.FilterResponses, response)
  917. }
  918. res.Version = int16(req.Version)
  919. return res
  920. }
  921. type MockDeleteGroupsResponse struct {
  922. deletedGroups []string
  923. }
  924. func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
  925. return &MockDeleteGroupsResponse{}
  926. }
  927. func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
  928. m.deletedGroups = groups
  929. return m
  930. }
  931. func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  932. resp := &DeleteGroupsResponse{
  933. GroupErrorCodes: map[string]KError{},
  934. }
  935. for _, group := range m.deletedGroups {
  936. resp.GroupErrorCodes[group] = ErrNoError
  937. }
  938. return resp
  939. }
  940. type MockDescribeLogDirsResponse struct {
  941. t TestReporter
  942. logDirs []DescribeLogDirsResponseDirMetadata
  943. }
  944. func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
  945. return &MockDescribeLogDirsResponse{t: t}
  946. }
  947. func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
  948. var topics []DescribeLogDirsResponseTopic
  949. for topic := range topicPartitions {
  950. var partitions []DescribeLogDirsResponsePartition
  951. for i := 0; i < topicPartitions[topic]; i++ {
  952. partitions = append(partitions, DescribeLogDirsResponsePartition{
  953. PartitionID: int32(i),
  954. IsTemporary: false,
  955. OffsetLag: int64(0),
  956. Size: int64(1234),
  957. })
  958. }
  959. topics = append(topics, DescribeLogDirsResponseTopic{
  960. Topic: topic,
  961. Partitions: partitions,
  962. })
  963. }
  964. logDir := DescribeLogDirsResponseDirMetadata{
  965. ErrorCode: ErrNoError,
  966. Path: logDirPath,
  967. Topics: topics,
  968. }
  969. m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
  970. return m
  971. }
  972. func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  973. resp := &DescribeLogDirsResponse{
  974. LogDirs: m.logDirs,
  975. }
  976. return resp
  977. }