Evan Huus 12 years ago
parent
commit
a1f7277ee3
6 changed files with 47 additions and 21 deletions
  1. 9 3
      broker.go
  2. 1 1
      client.go
  3. 14 6
      metadata.go
  4. 7 3
      metadataRequest.go
  5. 1 1
      packetDecoder.go
  6. 15 7
      topicMetadata.go

+ 9 - 3
broker.go

@@ -14,13 +14,19 @@ func (b *broker) encode(pe packetEncoder) {
 
 func (b *broker) decode(pd *packetDecoder) (err error) {
 	b.nodeId, err = pd.getInt32()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	b.host, err = pd.getString()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	b.port, err = pd.getInt32()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	return nil
 }

+ 1 - 1
client.go

@@ -1,8 +1,8 @@
 package kafka
 
 import (
-	"errors"
 	"encoding/binary"
+	"errors"
 	"math"
 	"net"
 )

+ 14 - 6
metadata.go

@@ -18,21 +18,29 @@ func (m *metadata) encode(pe packetEncoder) {
 
 func (m *metadata) decode(pd *packetDecoder) (err error) {
 	n, err := pd.getArrayCount()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	m.brokers = make([]broker, n)
-	for i := 0; i<n; i++ {
+	for i := 0; i < n; i++ {
 		err = (&m.brokers[i]).decode(pd)
-		if err != nil { return err }
+		if err != nil {
+			return err
+		}
 	}
 
 	n, err = pd.getArrayCount()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	m.topics = make([]topicMetadata, n)
-	for i := 0; i<n; i++ {
+	for i := 0; i < n; i++ {
 		err = (&m.topics[i]).decode(pd)
-		if err != nil { return err }
+		if err != nil {
+			return err
+		}
 	}
 
 	return nil

+ 7 - 3
metadataRequest.go

@@ -13,12 +13,16 @@ func (mr *metadataRequest) encode(pe packetEncoder) {
 
 func (mr *metadataRequest) decode(pd *packetDecoder) (err error) {
 	n, err := pd.getArrayCount()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	mr.topics = make([]*string, n)
-	for i := 0; i<n; i++ {
+	for i := 0; i < n; i++ {
 		mr.topics[i], err = pd.getString()
-		if err != nil { return err }
+		if err != nil {
+			return err
+		}
 	}
 
 	return nil

+ 1 - 1
packetDecoder.go

@@ -70,7 +70,7 @@ func (pd *packetDecoder) getString() (*string, error) {
 		return nil, errors.New("kafka getString: not enough data")
 	default:
 		tmp := new(string)
-		*tmp = string(pd.raw[pd.off:pd.off+n])
+		*tmp = string(pd.raw[pd.off : pd.off+n])
 		return tmp, nil
 	}
 }

+ 15 - 7
topicMetadata.go

@@ -7,7 +7,7 @@ type topicMetadata struct {
 }
 
 func (tm *topicMetadata) encode(pe packetEncoder) {
-        pe.putError(tm.err)
+	pe.putError(tm.err)
 	pe.putString(tm.name)
 	pe.putInt32(int32(len(m.partitions)))
 	for i := range m.partitions {
@@ -17,20 +17,28 @@ func (tm *topicMetadata) encode(pe packetEncoder) {
 
 func (tm *topicMetadata) decode(pd *packetDecoder) (err error) {
 	n, err := pd.getArrayCount()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	m.brokers = make([]broker, n)
-	for i := 0; i<n; i++ {
+	for i := 0; i < n; i++ {
 		err = (&m.brokers[i]).decode(pd)
-		if err != nil { return err }
+		if err != nil {
+			return err
+		}
 	}
 
 	n, err = pd.getArrayCount()
-	if err != nil { return err }
+	if err != nil {
+		return err
+	}
 
 	m.topics = make([]topic, n)
-	for i := 0; i<n; i++ {
+	for i := 0; i < n; i++ {
 		err = (&m.topics[i]).decode(pd)
-		if err != nil { return err }
+		if err != nil {
+			return err
+		}
 	}
 }