request.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. )
  7. type protocolBody interface {
  8. encoder
  9. versionedDecoder
  10. key() int16
  11. version() int16
  12. }
  13. type request struct {
  14. correlationID int32
  15. clientID string
  16. body protocolBody
  17. }
  18. func (r *request) encode(pe packetEncoder) (err error) {
  19. pe.push(&lengthField{})
  20. pe.putInt16(r.body.key())
  21. pe.putInt16(r.body.version())
  22. pe.putInt32(r.correlationID)
  23. err = pe.putString(r.clientID)
  24. if err != nil {
  25. return err
  26. }
  27. err = r.body.encode(pe)
  28. if err != nil {
  29. return err
  30. }
  31. return pe.pop()
  32. }
  33. func (r *request) decode(pd packetDecoder) (err error) {
  34. var key int16
  35. if key, err = pd.getInt16(); err != nil {
  36. return err
  37. }
  38. var version int16
  39. if version, err = pd.getInt16(); err != nil {
  40. return err
  41. }
  42. if r.correlationID, err = pd.getInt32(); err != nil {
  43. return err
  44. }
  45. r.clientID, err = pd.getString()
  46. r.body = allocateBody(key, version)
  47. if r.body == nil {
  48. return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
  49. }
  50. return r.body.decode(pd, version)
  51. }
  52. func decodeRequest(r io.Reader) (req *request, err error) {
  53. lengthBytes := make([]byte, 4)
  54. if _, err := io.ReadFull(r, lengthBytes); err != nil {
  55. return nil, err
  56. }
  57. length := int32(binary.BigEndian.Uint32(lengthBytes))
  58. if length <= 4 || length > MaxRequestSize {
  59. return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
  60. }
  61. encodedReq := make([]byte, length)
  62. if _, err := io.ReadFull(r, encodedReq); err != nil {
  63. return nil, err
  64. }
  65. req = &request{}
  66. if err := decode(encodedReq, req); err != nil {
  67. return nil, err
  68. }
  69. return req, nil
  70. }
  71. func allocateBody(key, version int16) protocolBody {
  72. switch key {
  73. case 0:
  74. return &ProduceRequest{}
  75. case 1:
  76. return &FetchRequest{}
  77. case 2:
  78. return &OffsetRequest{}
  79. case 3:
  80. return &MetadataRequest{}
  81. case 8:
  82. return &OffsetCommitRequest{Version: version}
  83. case 9:
  84. return &OffsetFetchRequest{}
  85. case 10:
  86. return &ConsumerMetadataRequest{}
  87. case 11:
  88. return &JoinGroupRequest{}
  89. case 12:
  90. return &HeartbeatRequest{}
  91. case 13:
  92. return &LeaveGroupRequest{}
  93. case 14:
  94. return &SyncGroupRequest{}
  95. case 15:
  96. return &DescribeGroupsRequest{}
  97. case 16:
  98. return &ListGroupsRequest{}
  99. case 17:
  100. return &SaslHandshakeRequest{}
  101. case 18:
  102. return &ApiVersionsRequest{}
  103. }
  104. return nil
  105. }