request.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. )
  7. type protocolBody interface {
  8. encoder
  9. versionedDecoder
  10. key() int16
  11. version() int16
  12. requiredVersion() KafkaVersion
  13. }
  14. type request struct {
  15. correlationID int32
  16. clientID string
  17. body protocolBody
  18. }
  19. func (r *request) encode(pe packetEncoder) (err error) {
  20. pe.push(&lengthField{})
  21. pe.putInt16(r.body.key())
  22. pe.putInt16(r.body.version())
  23. pe.putInt32(r.correlationID)
  24. err = pe.putString(r.clientID)
  25. if err != nil {
  26. return err
  27. }
  28. err = r.body.encode(pe)
  29. if err != nil {
  30. return err
  31. }
  32. return pe.pop()
  33. }
  34. func (r *request) decode(pd packetDecoder) (err error) {
  35. var key int16
  36. if key, err = pd.getInt16(); err != nil {
  37. return err
  38. }
  39. var version int16
  40. if version, err = pd.getInt16(); err != nil {
  41. return err
  42. }
  43. if r.correlationID, err = pd.getInt32(); err != nil {
  44. return err
  45. }
  46. r.clientID, err = pd.getString()
  47. r.body = allocateBody(key, version)
  48. if r.body == nil {
  49. return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
  50. }
  51. return r.body.decode(pd, version)
  52. }
  53. func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
  54. lengthBytes := make([]byte, 4)
  55. if _, err := io.ReadFull(r, lengthBytes); err != nil {
  56. return nil, bytesRead, err
  57. }
  58. bytesRead += len(lengthBytes)
  59. length := int32(binary.BigEndian.Uint32(lengthBytes))
  60. if length <= 4 || length > MaxRequestSize {
  61. return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
  62. }
  63. encodedReq := make([]byte, length)
  64. if _, err := io.ReadFull(r, encodedReq); err != nil {
  65. return nil, bytesRead, err
  66. }
  67. bytesRead += len(encodedReq)
  68. req = &request{}
  69. if err := decode(encodedReq, req); err != nil {
  70. return nil, bytesRead, err
  71. }
  72. return req, bytesRead, nil
  73. }
  74. func allocateBody(key, version int16) protocolBody {
  75. switch key {
  76. case 0:
  77. return &ProduceRequest{}
  78. case 1:
  79. return &FetchRequest{}
  80. case 2:
  81. return &OffsetRequest{Version: version}
  82. case 3:
  83. return &MetadataRequest{}
  84. case 8:
  85. return &OffsetCommitRequest{Version: version}
  86. case 9:
  87. return &OffsetFetchRequest{}
  88. case 10:
  89. return &FindCoordinatorRequest{}
  90. case 11:
  91. return &JoinGroupRequest{}
  92. case 12:
  93. return &HeartbeatRequest{}
  94. case 13:
  95. return &LeaveGroupRequest{}
  96. case 14:
  97. return &SyncGroupRequest{}
  98. case 15:
  99. return &DescribeGroupsRequest{}
  100. case 16:
  101. return &ListGroupsRequest{}
  102. case 17:
  103. return &SaslHandshakeRequest{}
  104. case 18:
  105. return &ApiVersionsRequest{}
  106. case 19:
  107. return &CreateTopicsRequest{}
  108. case 20:
  109. return &DeleteTopicsRequest{}
  110. case 21:
  111. return &DeleteRecordsRequest{}
  112. case 22:
  113. return &InitProducerIDRequest{}
  114. case 24:
  115. return &AddPartitionsToTxnRequest{}
  116. case 25:
  117. return &AddOffsetsToTxnRequest{}
  118. case 26:
  119. return &EndTxnRequest{}
  120. case 28:
  121. return &TxnOffsetCommitRequest{}
  122. case 29:
  123. return &DescribeAclsRequest{}
  124. case 30:
  125. return &CreateAclsRequest{}
  126. case 31:
  127. return &DeleteAclsRequest{}
  128. case 32:
  129. return &DescribeConfigsRequest{}
  130. case 33:
  131. return &AlterConfigsRequest{}
  132. case 37:
  133. return &CreatePartitionsRequest{}
  134. case 42:
  135. return &DeleteGroupsRequest{}
  136. }
  137. return nil
  138. }