request.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. )
  7. type protocolBody interface {
  8. encoder
  9. versionedDecoder
  10. key() int16
  11. version() int16
  12. requiredVersion() KafkaVersion
  13. }
  14. type request struct {
  15. correlationID int32
  16. clientID string
  17. body protocolBody
  18. }
  19. func (r *request) encode(pe packetEncoder) error {
  20. pe.push(&lengthField{})
  21. pe.putInt16(r.body.key())
  22. pe.putInt16(r.body.version())
  23. pe.putInt32(r.correlationID)
  24. err := pe.putString(r.clientID)
  25. if err != nil {
  26. return err
  27. }
  28. err = r.body.encode(pe)
  29. if err != nil {
  30. return err
  31. }
  32. return pe.pop()
  33. }
  34. func (r *request) decode(pd packetDecoder) (err error) {
  35. key, err := pd.getInt16()
  36. if err != nil {
  37. return err
  38. }
  39. version, err := pd.getInt16()
  40. if err != nil {
  41. return err
  42. }
  43. r.correlationID, err = pd.getInt32()
  44. if err != nil {
  45. return err
  46. }
  47. r.clientID, err = pd.getString()
  48. if err != nil {
  49. return err
  50. }
  51. r.body = allocateBody(key, version)
  52. if r.body == nil {
  53. return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
  54. }
  55. return r.body.decode(pd, version)
  56. }
  57. func decodeRequest(r io.Reader) (*request, int, error) {
  58. var (
  59. bytesRead int
  60. lengthBytes = make([]byte, 4)
  61. )
  62. if _, err := io.ReadFull(r, lengthBytes); err != nil {
  63. return nil, bytesRead, err
  64. }
  65. bytesRead += len(lengthBytes)
  66. length := int32(binary.BigEndian.Uint32(lengthBytes))
  67. if length <= 4 || length > MaxRequestSize {
  68. return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
  69. }
  70. encodedReq := make([]byte, length)
  71. if _, err := io.ReadFull(r, encodedReq); err != nil {
  72. return nil, bytesRead, err
  73. }
  74. bytesRead += len(encodedReq)
  75. req := &request{}
  76. if err := decode(encodedReq, req); err != nil {
  77. return nil, bytesRead, err
  78. }
  79. return req, bytesRead, nil
  80. }
  81. func allocateBody(key, version int16) protocolBody {
  82. switch key {
  83. case 0:
  84. return &ProduceRequest{}
  85. case 1:
  86. return &FetchRequest{Version: version}
  87. case 2:
  88. return &OffsetRequest{Version: version}
  89. case 3:
  90. return &MetadataRequest{}
  91. case 8:
  92. return &OffsetCommitRequest{Version: version}
  93. case 9:
  94. return &OffsetFetchRequest{}
  95. case 10:
  96. return &FindCoordinatorRequest{}
  97. case 11:
  98. return &JoinGroupRequest{}
  99. case 12:
  100. return &HeartbeatRequest{}
  101. case 13:
  102. return &LeaveGroupRequest{}
  103. case 14:
  104. return &SyncGroupRequest{}
  105. case 15:
  106. return &DescribeGroupsRequest{}
  107. case 16:
  108. return &ListGroupsRequest{}
  109. case 17:
  110. return &SaslHandshakeRequest{}
  111. case 18:
  112. return &ApiVersionsRequest{}
  113. case 19:
  114. return &CreateTopicsRequest{}
  115. case 20:
  116. return &DeleteTopicsRequest{}
  117. case 21:
  118. return &DeleteRecordsRequest{}
  119. case 22:
  120. return &InitProducerIDRequest{}
  121. case 24:
  122. return &AddPartitionsToTxnRequest{}
  123. case 25:
  124. return &AddOffsetsToTxnRequest{}
  125. case 26:
  126. return &EndTxnRequest{}
  127. case 28:
  128. return &TxnOffsetCommitRequest{}
  129. case 29:
  130. return &DescribeAclsRequest{}
  131. case 30:
  132. return &CreateAclsRequest{}
  133. case 31:
  134. return &DeleteAclsRequest{}
  135. case 32:
  136. return &DescribeConfigsRequest{}
  137. case 33:
  138. return &AlterConfigsRequest{}
  139. case 35:
  140. return &DescribeLogDirsRequest{}
  141. case 36:
  142. return &SaslAuthenticateRequest{}
  143. case 37:
  144. return &CreatePartitionsRequest{}
  145. case 42:
  146. return &DeleteGroupsRequest{}
  147. }
  148. return nil
  149. }