Browse Source

Merge pull request #50 from Shopify/panic_handler

Add PanicHandler to ClientConfig
Florian Weingarten 12 years ago
parent
commit
f7196617d5
5 changed files with 24 additions and 8 deletions
  1. 3 3
      broker.go
  2. 4 4
      client.go
  3. 1 1
      consumer.go
  4. 4 0
      sarama.go
  5. 12 0
      utils.go

+ 3 - 3
broker.go

@@ -49,7 +49,7 @@ func (b *Broker) Open(maxOpenRequests int) error {
 		return AlreadyConnected
 	}
 
-	go func() {
+	go withRecover(func() {
 		defer b.lock.Unlock()
 
 		b.conn, b.connErr = net.Dial("tcp", b.addr)
@@ -65,8 +65,8 @@ func (b *Broker) Open(maxOpenRequests int) error {
 		b.responses = make(chan responsePromise, maxOpenRequests)
 
 		Logger.Printf("Connected to broker %s\n", b.addr)
-		go b.responseReceiver()
-	}()
+		go withRecover(b.responseReceiver)
+	})
 
 	return nil
 }

+ 4 - 4
client.go

@@ -80,13 +80,13 @@ func (client *Client) Close() error {
 	defer client.lock.Unlock()
 
 	for _, broker := range client.brokers {
-		go broker.Close()
+		go withRecover(func() { broker.Close() })
 	}
 	client.brokers = nil
 	client.leaders = nil
 
 	if client.extraBroker != nil {
-		go client.extraBroker.Close()
+		go withRecover(func() { client.extraBroker.Close() })
 	}
 
 	return nil
@@ -180,7 +180,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 		delete(client.brokers, broker.ID())
 	}
 
-	go broker.Close()
+	go withRecover(func() { broker.Close() })
 }
 
 func (client *Client) refreshMetadata(topics []string, retries int) error {
@@ -286,7 +286,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
-			go client.brokers[broker.ID()].Close()
+			go withRecover(func() { client.brokers[broker.ID()].Close() })
 			broker.Open(client.config.ConcurrencyPerBroker)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())

+ 1 - 1
consumer.go

@@ -140,7 +140,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid OffsetMethod")
 	}
 
-	go c.fetchMessages()
+	go withRecover(c.fetchMessages)
 
 	return c, nil
 }

+ 4 - 0
sarama.go

@@ -15,3 +15,7 @@ import (
 // management events to. By default it is set to discard all log messages via ioutil.Discard,
 // but you can set it to redirect wherever you want.
 var Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
+
+// PanicHandler is called for recovering from panics spawned internally to the library (and thus
+// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
+var PanicHandler func(interface{})

+ 12 - 0
utils.go

@@ -15,6 +15,18 @@ func (slice int32Slice) Swap(i, j int) {
 	slice[i], slice[j] = slice[j], slice[i]
 }
 
+func withRecover(fn func()) {
+	if PanicHandler != nil {
+		defer func() {
+			if err := recover(); err != nil {
+				PanicHandler(err)
+			}
+		}()
+	}
+
+	fn()
+}
+
 // Encoder is a simple interface for any type that can be encoded as an array of bytes
 // in order to be sent as the key or value of a Kafka message.
 type Encoder interface {