request.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. )
  7. type protocolBody interface {
  8. encoder
  9. versionedDecoder
  10. key() int16
  11. version() int16
  12. headerVersion() int16
  13. requiredVersion() KafkaVersion
  14. }
  15. type request struct {
  16. correlationID int32
  17. clientID string
  18. body protocolBody
  19. }
  20. func (r *request) encode(pe packetEncoder) error {
  21. pe.push(&lengthField{})
  22. pe.putInt16(r.body.key())
  23. pe.putInt16(r.body.version())
  24. pe.putInt32(r.correlationID)
  25. if r.body.headerVersion() >= 1 {
  26. err := pe.putString(r.clientID)
  27. if err != nil {
  28. return err
  29. }
  30. }
  31. if r.body.headerVersion() >= 2 {
  32. // we don't use tag headers at the moment so we just put an array length of 0
  33. pe.putUVarint(0)
  34. }
  35. err := r.body.encode(pe)
  36. if err != nil {
  37. return err
  38. }
  39. return pe.pop()
  40. }
  41. func (r *request) decode(pd packetDecoder) (err error) {
  42. key, err := pd.getInt16()
  43. if err != nil {
  44. return err
  45. }
  46. version, err := pd.getInt16()
  47. if err != nil {
  48. return err
  49. }
  50. r.correlationID, err = pd.getInt32()
  51. if err != nil {
  52. return err
  53. }
  54. r.clientID, err = pd.getString()
  55. if err != nil {
  56. return err
  57. }
  58. r.body = allocateBody(key, version)
  59. if r.body == nil {
  60. return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
  61. }
  62. if r.body.headerVersion() >= 2 {
  63. // tagged field
  64. _, err = pd.getUVarint()
  65. if err != nil {
  66. return err
  67. }
  68. }
  69. return r.body.decode(pd, version)
  70. }
  71. func decodeRequest(r io.Reader) (*request, int, error) {
  72. var (
  73. bytesRead int
  74. lengthBytes = make([]byte, 4)
  75. )
  76. if _, err := io.ReadFull(r, lengthBytes); err != nil {
  77. return nil, bytesRead, err
  78. }
  79. bytesRead += len(lengthBytes)
  80. length := int32(binary.BigEndian.Uint32(lengthBytes))
  81. if length <= 4 || length > MaxRequestSize {
  82. return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
  83. }
  84. encodedReq := make([]byte, length)
  85. if _, err := io.ReadFull(r, encodedReq); err != nil {
  86. return nil, bytesRead, err
  87. }
  88. bytesRead += len(encodedReq)
  89. req := &request{}
  90. if err := decode(encodedReq, req); err != nil {
  91. return nil, bytesRead, err
  92. }
  93. return req, bytesRead, nil
  94. }
  95. func allocateBody(key, version int16) protocolBody {
  96. switch key {
  97. case 0:
  98. return &ProduceRequest{}
  99. case 1:
  100. return &FetchRequest{Version: version}
  101. case 2:
  102. return &OffsetRequest{Version: version}
  103. case 3:
  104. return &MetadataRequest{}
  105. case 8:
  106. return &OffsetCommitRequest{Version: version}
  107. case 9:
  108. return &OffsetFetchRequest{}
  109. case 10:
  110. return &FindCoordinatorRequest{}
  111. case 11:
  112. return &JoinGroupRequest{}
  113. case 12:
  114. return &HeartbeatRequest{}
  115. case 13:
  116. return &LeaveGroupRequest{}
  117. case 14:
  118. return &SyncGroupRequest{}
  119. case 15:
  120. return &DescribeGroupsRequest{}
  121. case 16:
  122. return &ListGroupsRequest{}
  123. case 17:
  124. return &SaslHandshakeRequest{}
  125. case 18:
  126. return &ApiVersionsRequest{}
  127. case 19:
  128. return &CreateTopicsRequest{}
  129. case 20:
  130. return &DeleteTopicsRequest{}
  131. case 21:
  132. return &DeleteRecordsRequest{}
  133. case 22:
  134. return &InitProducerIDRequest{}
  135. case 24:
  136. return &AddPartitionsToTxnRequest{}
  137. case 25:
  138. return &AddOffsetsToTxnRequest{}
  139. case 26:
  140. return &EndTxnRequest{}
  141. case 28:
  142. return &TxnOffsetCommitRequest{}
  143. case 29:
  144. return &DescribeAclsRequest{}
  145. case 30:
  146. return &CreateAclsRequest{}
  147. case 31:
  148. return &DeleteAclsRequest{}
  149. case 32:
  150. return &DescribeConfigsRequest{}
  151. case 33:
  152. return &AlterConfigsRequest{}
  153. case 35:
  154. return &DescribeLogDirsRequest{}
  155. case 36:
  156. return &SaslAuthenticateRequest{}
  157. case 37:
  158. return &CreatePartitionsRequest{}
  159. case 42:
  160. return &DeleteGroupsRequest{}
  161. case 45:
  162. return &AlterPartitionReassignmentsRequest{}
  163. case 46:
  164. return &ListPartitionReassignmentsRequest{}
  165. }
  166. return nil
  167. }