Browse Source

Misc minor lint changes

- Consistently use `r` as the receiver for requests and responses, and `b` as
  the receiver for blocks.
- Drop unnecessary types that are easily inferred in variable declarations.
Evan Huus 9 years ago
parent
commit
88a4afb3d1

+ 8 - 8
api_versions_response.go

@@ -6,25 +6,25 @@ type ApiVersionsResponseBlock struct {
 	MaxVersion int16
 }
 
-func (r *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
-	pe.putInt16(r.ApiKey)
-	pe.putInt16(r.MinVersion)
-	pe.putInt16(r.MaxVersion)
+func (b *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
+	pe.putInt16(b.ApiKey)
+	pe.putInt16(b.MinVersion)
+	pe.putInt16(b.MaxVersion)
 	return nil
 }
 
-func (r *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
+func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
 	var err error
 
-	if r.ApiKey, err = pd.getInt16(); err != nil {
+	if b.ApiKey, err = pd.getInt16(); err != nil {
 		return err
 	}
 
-	if r.MinVersion, err = pd.getInt16(); err != nil {
+	if b.MinVersion, err = pd.getInt16(); err != nil {
 		return err
 	}
 
-	if r.MaxVersion, err = pd.getInt16(); err != nil {
+	if b.MaxVersion, err = pd.getInt16(); err != nil {
 		return err
 	}
 

+ 1 - 1
config.go

@@ -8,7 +8,7 @@ import (
 
 const defaultClientID = "sarama"
 
-var validID *regexp.Regexp = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
+var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
 
 // Config is used to pass multiple configuration options to Sarama's constructors.
 type Config struct {

+ 27 - 27
fetch_request.go

@@ -5,17 +5,17 @@ type fetchRequestBlock struct {
 	maxBytes    int32
 }
 
-func (f *fetchRequestBlock) encode(pe packetEncoder) error {
-	pe.putInt64(f.fetchOffset)
-	pe.putInt32(f.maxBytes)
+func (b *fetchRequestBlock) encode(pe packetEncoder) error {
+	pe.putInt64(b.fetchOffset)
+	pe.putInt32(b.maxBytes)
 	return nil
 }
 
-func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
-	if f.fetchOffset, err = pd.getInt64(); err != nil {
+func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
+	if b.fetchOffset, err = pd.getInt64(); err != nil {
 		return err
 	}
-	if f.maxBytes, err = pd.getInt32(); err != nil {
+	if b.maxBytes, err = pd.getInt32(); err != nil {
 		return err
 	}
 	return nil
@@ -28,15 +28,15 @@ type FetchRequest struct {
 	blocks      map[string]map[int32]*fetchRequestBlock
 }
 
-func (f *FetchRequest) encode(pe packetEncoder) (err error) {
+func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 	pe.putInt32(-1) // replica ID is always -1 for clients
-	pe.putInt32(f.MaxWaitTime)
-	pe.putInt32(f.MinBytes)
-	err = pe.putArrayLength(len(f.blocks))
+	pe.putInt32(r.MaxWaitTime)
+	pe.putInt32(r.MinBytes)
+	err = pe.putArrayLength(len(r.blocks))
 	if err != nil {
 		return err
 	}
-	for topic, blocks := range f.blocks {
+	for topic, blocks := range r.blocks {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
@@ -56,15 +56,15 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
 	return nil
 }
 
-func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
-	f.Version = version
+func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
 	if _, err = pd.getInt32(); err != nil {
 		return err
 	}
-	if f.MaxWaitTime, err = pd.getInt32(); err != nil {
+	if r.MaxWaitTime, err = pd.getInt32(); err != nil {
 		return err
 	}
-	if f.MinBytes, err = pd.getInt32(); err != nil {
+	if r.MinBytes, err = pd.getInt32(); err != nil {
 		return err
 	}
 	topicCount, err := pd.getArrayLength()
@@ -74,7 +74,7 @@ func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 	if topicCount == 0 {
 		return nil
 	}
-	f.blocks = make(map[string]map[int32]*fetchRequestBlock)
+	r.blocks = make(map[string]map[int32]*fetchRequestBlock)
 	for i := 0; i < topicCount; i++ {
 		topic, err := pd.getString()
 		if err != nil {
@@ -84,7 +84,7 @@ func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 		if err != nil {
 			return err
 		}
-		f.blocks[topic] = make(map[int32]*fetchRequestBlock)
+		r.blocks[topic] = make(map[int32]*fetchRequestBlock)
 		for j := 0; j < partitionCount; j++ {
 			partition, err := pd.getInt32()
 			if err != nil {
@@ -94,18 +94,18 @@ func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 			if err = fetchBlock.decode(pd); err != nil {
 				return nil
 			}
-			f.blocks[topic][partition] = fetchBlock
+			r.blocks[topic][partition] = fetchBlock
 		}
 	}
 	return nil
 }
 
-func (f *FetchRequest) key() int16 {
+func (r *FetchRequest) key() int16 {
 	return 1
 }
 
-func (f *FetchRequest) version() int16 {
-	return f.Version
+func (r *FetchRequest) version() int16 {
+	return r.Version
 }
 
 func (r *FetchRequest) requiredVersion() KafkaVersion {
@@ -119,18 +119,18 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
 	}
 }
 
-func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
-	if f.blocks == nil {
-		f.blocks = make(map[string]map[int32]*fetchRequestBlock)
+func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
+	if r.blocks == nil {
+		r.blocks = make(map[string]map[int32]*fetchRequestBlock)
 	}
 
-	if f.blocks[topic] == nil {
-		f.blocks[topic] = make(map[int32]*fetchRequestBlock)
+	if r.blocks[topic] == nil {
+		r.blocks[topic] = make(map[int32]*fetchRequestBlock)
 	}
 
 	tmp := new(fetchRequestBlock)
 	tmp.maxBytes = maxBytes
 	tmp.fetchOffset = fetchOffset
 
-	f.blocks[topic][partitionID] = tmp
+	r.blocks[topic][partitionID] = tmp
 }

+ 40 - 40
fetch_response.go

@@ -8,14 +8,14 @@ type FetchResponseBlock struct {
 	MsgSet              MessageSet
 }
 
-func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
+func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	pr.Err = KError(tmp)
+	b.Err = KError(tmp)
 
-	pr.HighWaterMarkOffset, err = pd.getInt64()
+	b.HighWaterMarkOffset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
@@ -29,39 +29,39 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	err = (&pr.MsgSet).decode(msgSetDecoder)
+	err = (&b.MsgSet).decode(msgSetDecoder)
 
 	return err
 }
 
-type FetchResponse struct {
-	Blocks       map[string]map[int32]*FetchResponseBlock
-	ThrottleTime time.Duration
-	Version      int16 // v1 requires 0.9+, v2 requires 0.10+
-}
-
-func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
-	pe.putInt16(int16(pr.Err))
+func (b *FetchResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(b.Err))
 
-	pe.putInt64(pr.HighWaterMarkOffset)
+	pe.putInt64(b.HighWaterMarkOffset)
 
 	pe.push(&lengthField{})
-	err = pr.MsgSet.encode(pe)
+	err = b.MsgSet.encode(pe)
 	if err != nil {
 		return err
 	}
 	return pe.pop()
 }
 
-func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
-	fr.Version = version
+type FetchResponse struct {
+	Blocks       map[string]map[int32]*FetchResponseBlock
+	ThrottleTime time.Duration
+	Version      int16 // v1 requires 0.9+, v2 requires 0.10+
+}
+
+func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
 
-	if fr.Version >= 1 {
+	if r.Version >= 1 {
 		throttle, err := pd.getInt32()
 		if err != nil {
 			return err
 		}
-		fr.ThrottleTime = time.Duration(throttle) * time.Millisecond
+		r.ThrottleTime = time.Duration(throttle) * time.Millisecond
 	}
 
 	numTopics, err := pd.getArrayLength()
@@ -69,7 +69,7 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 		return err
 	}
 
-	fr.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
+	r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		if err != nil {
@@ -81,7 +81,7 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 			return err
 		}
 
-		fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
+		r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
 			id, err := pd.getInt32()
@@ -94,24 +94,24 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 			if err != nil {
 				return err
 			}
-			fr.Blocks[name][id] = block
+			r.Blocks[name][id] = block
 		}
 	}
 
 	return nil
 }
 
-func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
-	if fr.Version >= 1 {
-		pe.putInt32(int32(fr.ThrottleTime / time.Millisecond))
+func (r *FetchResponse) encode(pe packetEncoder) (err error) {
+	if r.Version >= 1 {
+		pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
 	}
 
-	err = pe.putArrayLength(len(fr.Blocks))
+	err = pe.putArrayLength(len(r.Blocks))
 	if err != nil {
 		return err
 	}
 
-	for topic, partitions := range fr.Blocks {
+	for topic, partitions := range r.Blocks {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
@@ -153,26 +153,26 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
 	}
 }
 
-func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
-	if fr.Blocks == nil {
+func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
+	if r.Blocks == nil {
 		return nil
 	}
 
-	if fr.Blocks[topic] == nil {
+	if r.Blocks[topic] == nil {
 		return nil
 	}
 
-	return fr.Blocks[topic][partition]
+	return r.Blocks[topic][partition]
 }
 
-func (fr *FetchResponse) AddError(topic string, partition int32, err KError) {
-	if fr.Blocks == nil {
-		fr.Blocks = make(map[string]map[int32]*FetchResponseBlock)
+func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
+	if r.Blocks == nil {
+		r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
 	}
-	partitions, ok := fr.Blocks[topic]
+	partitions, ok := r.Blocks[topic]
 	if !ok {
 		partitions = make(map[int32]*FetchResponseBlock)
-		fr.Blocks[topic] = partitions
+		r.Blocks[topic] = partitions
 	}
 	frb, ok := partitions[partition]
 	if !ok {
@@ -182,14 +182,14 @@ func (fr *FetchResponse) AddError(topic string, partition int32, err KError) {
 	frb.Err = err
 }
 
-func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
-	if fr.Blocks == nil {
-		fr.Blocks = make(map[string]map[int32]*FetchResponseBlock)
+func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
+	if r.Blocks == nil {
+		r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
 	}
-	partitions, ok := fr.Blocks[topic]
+	partitions, ok := r.Blocks[topic]
 	if !ok {
 		partitions = make(map[int32]*FetchResponseBlock)
-		fr.Blocks[topic] = partitions
+		r.Blocks[topic] = partitions
 	}
 	frb, ok := partitions[partition]
 	if !ok {

+ 11 - 11
metadata_request.go

@@ -4,14 +4,14 @@ type MetadataRequest struct {
 	Topics []string
 }
 
-func (mr *MetadataRequest) encode(pe packetEncoder) error {
-	err := pe.putArrayLength(len(mr.Topics))
+func (r *MetadataRequest) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(r.Topics))
 	if err != nil {
 		return err
 	}
 
-	for i := range mr.Topics {
-		err = pe.putString(mr.Topics[i])
+	for i := range r.Topics {
+		err = pe.putString(r.Topics[i])
 		if err != nil {
 			return err
 		}
@@ -19,7 +19,7 @@ func (mr *MetadataRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (mr *MetadataRequest) decode(pd packetDecoder, version int16) error {
+func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
 	topicCount, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -28,25 +28,25 @@ func (mr *MetadataRequest) decode(pd packetDecoder, version int16) error {
 		return nil
 	}
 
-	mr.Topics = make([]string, topicCount)
-	for i := range mr.Topics {
+	r.Topics = make([]string, topicCount)
+	for i := range r.Topics {
 		topic, err := pd.getString()
 		if err != nil {
 			return err
 		}
-		mr.Topics[i] = topic
+		r.Topics[i] = topic
 	}
 	return nil
 }
 
-func (mr *MetadataRequest) key() int16 {
+func (r *MetadataRequest) key() int16 {
 	return 3
 }
 
-func (mr *MetadataRequest) version() int16 {
+func (r *MetadataRequest) version() int16 {
 	return 0
 }
 
-func (mr *MetadataRequest) requiredVersion() KafkaVersion {
+func (r *MetadataRequest) requiredVersion() KafkaVersion {
 	return minVersion
 }

+ 19 - 19
metadata_response.go

@@ -118,16 +118,16 @@ type MetadataResponse struct {
 	Topics  []*TopicMetadata
 }
 
-func (m *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
+func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
-	m.Brokers = make([]*Broker, n)
+	r.Brokers = make([]*Broker, n)
 	for i := 0; i < n; i++ {
-		m.Brokers[i] = new(Broker)
-		err = m.Brokers[i].decode(pd)
+		r.Brokers[i] = new(Broker)
+		err = r.Brokers[i].decode(pd)
 		if err != nil {
 			return err
 		}
@@ -138,10 +138,10 @@ func (m *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 		return err
 	}
 
-	m.Topics = make([]*TopicMetadata, n)
+	r.Topics = make([]*TopicMetadata, n)
 	for i := 0; i < n; i++ {
-		m.Topics[i] = new(TopicMetadata)
-		err = m.Topics[i].decode(pd)
+		r.Topics[i] = new(TopicMetadata)
+		err = r.Topics[i].decode(pd)
 		if err != nil {
 			return err
 		}
@@ -150,23 +150,23 @@ func (m *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 
-func (m *MetadataResponse) encode(pe packetEncoder) error {
-	err := pe.putArrayLength(len(m.Brokers))
+func (r *MetadataResponse) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(r.Brokers))
 	if err != nil {
 		return err
 	}
-	for _, broker := range m.Brokers {
+	for _, broker := range r.Brokers {
 		err = broker.encode(pe)
 		if err != nil {
 			return err
 		}
 	}
 
-	err = pe.putArrayLength(len(m.Topics))
+	err = pe.putArrayLength(len(r.Topics))
 	if err != nil {
 		return err
 	}
-	for _, tm := range m.Topics {
+	for _, tm := range r.Topics {
 		err = tm.encode(pe)
 		if err != nil {
 			return err
@@ -190,14 +190,14 @@ func (r *MetadataResponse) requiredVersion() KafkaVersion {
 
 // testing API
 
-func (m *MetadataResponse) AddBroker(addr string, id int32) {
-	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
+func (r *MetadataResponse) AddBroker(addr string, id int32) {
+	r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
 }
 
-func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
+func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
 	var tmatch *TopicMetadata
 
-	for _, tm := range m.Topics {
+	for _, tm := range r.Topics {
 		if tm.Name == topic {
 			tmatch = tm
 			goto foundTopic
@@ -206,7 +206,7 @@ func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
 
 	tmatch = new(TopicMetadata)
 	tmatch.Name = topic
-	m.Topics = append(m.Topics, tmatch)
+	r.Topics = append(r.Topics, tmatch)
 
 foundTopic:
 
@@ -214,8 +214,8 @@ foundTopic:
 	return tmatch
 }
 
-func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
-	tmatch := m.AddTopic(topic, ErrNoError)
+func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
+	tmatch := r.AddTopic(topic, ErrNoError)
 	var pmatch *PartitionMetadata
 
 	for _, pm := range tmatch.Partitions {

+ 9 - 9
offset_commit_request.go

@@ -16,27 +16,27 @@ type offsetCommitRequestBlock struct {
 	metadata  string
 }
 
-func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
-	pe.putInt64(r.offset)
+func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
+	pe.putInt64(b.offset)
 	if version == 1 {
-		pe.putInt64(r.timestamp)
-	} else if r.timestamp != 0 {
+		pe.putInt64(b.timestamp)
+	} else if b.timestamp != 0 {
 		Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
 	}
 
-	return pe.putString(r.metadata)
+	return pe.putString(b.metadata)
 }
 
-func (r *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) {
-	if r.offset, err = pd.getInt64(); err != nil {
+func (b *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) {
+	if b.offset, err = pd.getInt64(); err != nil {
 		return err
 	}
 	if version == 1 {
-		if r.timestamp, err = pd.getInt64(); err != nil {
+		if b.timestamp, err = pd.getInt64(); err != nil {
 			return err
 		}
 	}
-	r.metadata, err = pd.getString()
+	b.metadata, err = pd.getString()
 	return err
 }
 

+ 8 - 8
offset_fetch_response.go

@@ -6,13 +6,13 @@ type OffsetFetchResponseBlock struct {
 	Err      KError
 }
 
-func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
-	r.Offset, err = pd.getInt64()
+func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
+	b.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
 
-	r.Metadata, err = pd.getString()
+	b.Metadata, err = pd.getString()
 	if err != nil {
 		return err
 	}
@@ -21,20 +21,20 @@ func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	r.Err = KError(tmp)
+	b.Err = KError(tmp)
 
 	return nil
 }
 
-func (r *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
-	pe.putInt64(r.Offset)
+func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt64(b.Offset)
 
-	err = pe.putString(r.Metadata)
+	err = pe.putString(b.Metadata)
 	if err != nil {
 		return err
 	}
 
-	pe.putInt16(int16(r.Err))
+	pe.putInt16(int16(b.Err))
 
 	return nil
 }

+ 6 - 6
offset_request.go

@@ -5,17 +5,17 @@ type offsetRequestBlock struct {
 	maxOffsets int32
 }
 
-func (r *offsetRequestBlock) encode(pe packetEncoder) error {
-	pe.putInt64(int64(r.time))
-	pe.putInt32(r.maxOffsets)
+func (b *offsetRequestBlock) encode(pe packetEncoder) error {
+	pe.putInt64(int64(b.time))
+	pe.putInt32(b.maxOffsets)
 	return nil
 }
 
-func (r *offsetRequestBlock) decode(pd packetDecoder) (err error) {
-	if r.time, err = pd.getInt64(); err != nil {
+func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) {
+	if b.time, err = pd.getInt64(); err != nil {
 		return err
 	}
-	if r.maxOffsets, err = pd.getInt32(); err != nil {
+	if b.maxOffsets, err = pd.getInt32(); err != nil {
 		return err
 	}
 	return nil

+ 6 - 6
offset_response.go

@@ -5,22 +5,22 @@ type OffsetResponseBlock struct {
 	Offsets []int64
 }
 
-func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
+func (b *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	r.Err = KError(tmp)
+	b.Err = KError(tmp)
 
-	r.Offsets, err = pd.getInt64Array()
+	b.Offsets, err = pd.getInt64Array()
 
 	return err
 }
 
-func (r *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
-	pe.putInt16(int16(r.Err))
+func (b *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(b.Err))
 
-	return pe.putInt64Array(r.Offsets)
+	return pe.putInt64Array(b.Offsets)
 }
 
 type OffsetResponse struct {

+ 29 - 29
produce_request.go

@@ -23,14 +23,14 @@ type ProduceRequest struct {
 	msgSets      map[string]map[int32]*MessageSet
 }
 
-func (p *ProduceRequest) encode(pe packetEncoder) error {
-	pe.putInt16(int16(p.RequiredAcks))
-	pe.putInt32(p.Timeout)
-	err := pe.putArrayLength(len(p.msgSets))
+func (r *ProduceRequest) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.RequiredAcks))
+	pe.putInt32(r.Timeout)
+	err := pe.putArrayLength(len(r.msgSets))
 	if err != nil {
 		return err
 	}
-	for topic, partitions := range p.msgSets {
+	for topic, partitions := range r.msgSets {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
@@ -55,13 +55,13 @@ func (p *ProduceRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (p *ProduceRequest) decode(pd packetDecoder, version int16) error {
+func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
 	requiredAcks, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	p.RequiredAcks = RequiredAcks(requiredAcks)
-	if p.Timeout, err = pd.getInt32(); err != nil {
+	r.RequiredAcks = RequiredAcks(requiredAcks)
+	if r.Timeout, err = pd.getInt32(); err != nil {
 		return err
 	}
 	topicCount, err := pd.getArrayLength()
@@ -71,7 +71,7 @@ func (p *ProduceRequest) decode(pd packetDecoder, version int16) error {
 	if topicCount == 0 {
 		return nil
 	}
-	p.msgSets = make(map[string]map[int32]*MessageSet)
+	r.msgSets = make(map[string]map[int32]*MessageSet)
 	for i := 0; i < topicCount; i++ {
 		topic, err := pd.getString()
 		if err != nil {
@@ -81,7 +81,7 @@ func (p *ProduceRequest) decode(pd packetDecoder, version int16) error {
 		if err != nil {
 			return err
 		}
-		p.msgSets[topic] = make(map[int32]*MessageSet)
+		r.msgSets[topic] = make(map[int32]*MessageSet)
 		for j := 0; j < partitionCount; j++ {
 			partition, err := pd.getInt32()
 			if err != nil {
@@ -100,22 +100,22 @@ func (p *ProduceRequest) decode(pd packetDecoder, version int16) error {
 			if err != nil {
 				return err
 			}
-			p.msgSets[topic][partition] = msgSet
+			r.msgSets[topic][partition] = msgSet
 		}
 	}
 	return nil
 }
 
-func (p *ProduceRequest) key() int16 {
+func (r *ProduceRequest) key() int16 {
 	return 0
 }
 
-func (p *ProduceRequest) version() int16 {
-	return p.Version
+func (r *ProduceRequest) version() int16 {
+	return r.Version
 }
 
-func (p *ProduceRequest) requiredVersion() KafkaVersion {
-	switch p.Version {
+func (r *ProduceRequest) requiredVersion() KafkaVersion {
+	switch r.Version {
 	case 1:
 		return V0_9_0_0
 	case 2:
@@ -125,33 +125,33 @@ func (p *ProduceRequest) requiredVersion() KafkaVersion {
 	}
 }
 
-func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
-	if p.msgSets == nil {
-		p.msgSets = make(map[string]map[int32]*MessageSet)
+func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
+	if r.msgSets == nil {
+		r.msgSets = make(map[string]map[int32]*MessageSet)
 	}
 
-	if p.msgSets[topic] == nil {
-		p.msgSets[topic] = make(map[int32]*MessageSet)
+	if r.msgSets[topic] == nil {
+		r.msgSets[topic] = make(map[int32]*MessageSet)
 	}
 
-	set := p.msgSets[topic][partition]
+	set := r.msgSets[topic][partition]
 
 	if set == nil {
 		set = new(MessageSet)
-		p.msgSets[topic][partition] = set
+		r.msgSets[topic][partition] = set
 	}
 
 	set.addMessage(msg)
 }
 
-func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
-	if p.msgSets == nil {
-		p.msgSets = make(map[string]map[int32]*MessageSet)
+func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
+	if r.msgSets == nil {
+		r.msgSets = make(map[string]map[int32]*MessageSet)
 	}
 
-	if p.msgSets[topic] == nil {
-		p.msgSets[topic] = make(map[int32]*MessageSet)
+	if r.msgSets[topic] == nil {
+		r.msgSets[topic] = make(map[int32]*MessageSet)
 	}
 
-	p.msgSets[topic][partition] = set
+	r.msgSets[topic][partition] = set
 }

+ 25 - 25
produce_response.go

@@ -8,14 +8,14 @@ type ProduceResponseBlock struct {
 	Timestamp time.Time // only provided if Version >= 2
 }
 
-func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
+func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	pr.Err = KError(tmp)
+	b.Err = KError(tmp)
 
-	pr.Offset, err = pd.getInt64()
+	b.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
@@ -24,7 +24,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
 		if millis, err := pd.getInt64(); err != nil {
 			return err
 		} else {
-			pr.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
+			b.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
 		}
 	}
 
@@ -37,15 +37,15 @@ type ProduceResponse struct {
 	ThrottleTime time.Duration // only provided if Version >= 1
 }
 
-func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
-	pr.Version = version
+func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
 
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
-	pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
+	r.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		if err != nil {
@@ -57,7 +57,7 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
 			return err
 		}
 
-		pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
+		r.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
 			id, err := pd.getInt32()
@@ -70,27 +70,27 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
 			if err != nil {
 				return err
 			}
-			pr.Blocks[name][id] = block
+			r.Blocks[name][id] = block
 		}
 	}
 
-	if pr.Version >= 1 {
+	if r.Version >= 1 {
 		if millis, err := pd.getInt32(); err != nil {
 			return err
 		} else {
-			pr.ThrottleTime = time.Duration(millis) * time.Millisecond
+			r.ThrottleTime = time.Duration(millis) * time.Millisecond
 		}
 	}
 
 	return nil
 }
 
-func (pr *ProduceResponse) encode(pe packetEncoder) error {
-	err := pe.putArrayLength(len(pr.Blocks))
+func (r *ProduceResponse) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(r.Blocks))
 	if err != nil {
 		return err
 	}
-	for topic, partitions := range pr.Blocks {
+	for topic, partitions := range r.Blocks {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
@@ -105,8 +105,8 @@ func (pr *ProduceResponse) encode(pe packetEncoder) error {
 			pe.putInt64(prb.Offset)
 		}
 	}
-	if pr.Version >= 1 {
-		pe.putInt32(int32(pr.ThrottleTime / time.Millisecond))
+	if r.Version >= 1 {
+		pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
 	}
 	return nil
 }
@@ -130,28 +130,28 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion {
 	}
 }
 
-func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
-	if pr.Blocks == nil {
+func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
+	if r.Blocks == nil {
 		return nil
 	}
 
-	if pr.Blocks[topic] == nil {
+	if r.Blocks[topic] == nil {
 		return nil
 	}
 
-	return pr.Blocks[topic][partition]
+	return r.Blocks[topic][partition]
 }
 
 // Testing API
 
-func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
-	if pr.Blocks == nil {
-		pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
+func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
+	if r.Blocks == nil {
+		r.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
 	}
-	byTopic, ok := pr.Blocks[topic]
+	byTopic, ok := r.Blocks[topic]
 	if !ok {
 		byTopic = make(map[int32]*ProduceResponseBlock)
-		pr.Blocks[topic] = byTopic
+		r.Blocks[topic] = byTopic
 	}
 	byTopic[partition] = &ProduceResponseBlock{Err: err}
 }