Browse Source

Check more errors

Caught by errcheck. All remaining errcheck warnings have been audited and are
harmless (mostly because Broker.Open returns such a limited set of errors).
Evan Huus 11 years ago
parent
commit
237cfb5380
3 changed files with 19 additions and 8 deletions
  1. 4 7
      client.go
  2. 4 1
      partitioner.go
  3. 11 0
      utils.go

+ 4 - 7
client.go

@@ -99,14 +99,13 @@ func (client *Client) Close() error {
 	Logger.Println("Closing Client")
 
 	for _, broker := range client.brokers {
-		myBroker := broker // NB: block-local prevents clobbering
-		go withRecover(func() { myBroker.Close() })
+		safeAsyncClose(broker)
 	}
 	client.brokers = nil
 	client.leaders = nil
 
 	if client.seedBroker != nil {
-		go withRecover(func() { client.seedBroker.Close() })
+		safeAsyncClose(client.seedBroker)
 	}
 
 	return nil
@@ -249,8 +248,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 		delete(client.brokers, broker.ID())
 	}
 
-	myBroker := broker // NB: block-local prevents clobbering
-	go withRecover(func() { myBroker.Close() })
+	safeAsyncClose(broker)
 }
 
 func (client *Client) Closed() bool {
@@ -415,8 +413,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
-			myBroker := client.brokers[broker.ID()] // use block-local to prevent clobbering `broker` for Gs
-			go withRecover(func() { myBroker.Close() })
+			safeAsyncClose(client.brokers[broker.ID()])
 			broker.Open(client.config.DefaultBrokerConf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())

+ 4 - 1
partitioner.go

@@ -74,7 +74,10 @@ func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
 		return p.random.Partition(key, numPartitions)
 	}
 	p.hasher.Reset()
-	p.hasher.Write(bytes)
+	_, err = p.hasher.Write(bytes)
+	if err != nil {
+		return p.random.Partition(key, numPartitions)
+	}
 	hash := int32(p.hasher.Sum32())
 	if hash < 0 {
 		hash = -hash

+ 11 - 0
utils.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "io"
+
 // make []int32 sortable so we can sort partition numbers
 type int32Slice []int32
 
@@ -27,6 +29,15 @@ func withRecover(fn func()) {
 	fn()
 }
 
+func safeAsyncClose(c io.Closer) {
+	tmp := c // local var prevents clobbering in goroutine
+	go withRecover(func() {
+		if err := tmp.Close(); err != nil {
+			Logger.Println("Error closing", tmp, ":", err)
+		}
+	})
+}
+
 // Encoder is a simple interface for any type that can be encoded as an array of bytes
 // in order to be sent as the key or value of a Kafka message. Length() is provided as an
 // optimization, and must return the same as len() on the result of Encode().