123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- package sarama
- import (
- "encoding/binary"
- "fmt"
- "io"
- )
- type requestBody interface {
- encoder
- decoder
- key() int16
- version() int16
- }
- type request struct {
- correlationID int32
- clientID string
- body requestBody
- }
- func (r *request) encode(pe packetEncoder) (err error) {
- pe.push(&lengthField{})
- pe.putInt16(r.body.key())
- pe.putInt16(r.body.version())
- pe.putInt32(r.correlationID)
- err = pe.putString(r.clientID)
- if err != nil {
- return err
- }
- err = r.body.encode(pe)
- if err != nil {
- return err
- }
- return pe.pop()
- }
- func (r *request) decode(pd packetDecoder) (err error) {
- var key int16
- if key, err = pd.getInt16(); err != nil {
- return err
- }
- var version int16
- if version, err = pd.getInt16(); err != nil {
- return err
- }
- if r.correlationID, err = pd.getInt32(); err != nil {
- return err
- }
- r.clientID, err = pd.getString()
- r.body = allocateBody(key, version)
- if r.body == nil {
- return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
- }
- return r.body.decode(pd)
- }
- func decodeRequest(r io.Reader) (req *request, err error) {
- lengthBytes := make([]byte, 4)
- if _, err := io.ReadFull(r, lengthBytes); err != nil {
- return nil, err
- }
- length := int32(binary.BigEndian.Uint32(lengthBytes))
- if length <= 4 || length > MaxRequestSize {
- return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
- }
- encodedReq := make([]byte, length)
- if _, err := io.ReadFull(r, encodedReq); err != nil {
- return nil, err
- }
- req = &request{}
- if err := decode(encodedReq, req); err != nil {
- return nil, err
- }
- return req, nil
- }
- func allocateBody(key, version int16) requestBody {
- switch key {
- case 0:
- return &ProduceRequest{}
- case 1:
- return &FetchRequest{}
- case 2:
- return &OffsetRequest{}
- case 3:
- return &MetadataRequest{}
- case 8:
- return &OffsetCommitRequest{Version: version}
- case 9:
- return &OffsetFetchRequest{}
- case 10:
- return &ConsumerMetadataRequest{}
- }
- return nil
- }
|