Burke Libbey 12 年之前
父节点
当前提交
0db1d6406d
共有 2 个文件被更改,包括 11 次插入13 次删除
  1. 1 2
      produce_message.go
  2. 10 11
      producer.go

+ 1 - 2
produce_message.go

@@ -22,8 +22,7 @@ func (msg *produceMessage) enqueue(p *Producer) error {
 		})
 		return <-errs
 	} else {
-		p.addMessage(msg)
-		return nil
+		return p.addMessage(msg)
 	}
 
 }

+ 10 - 11
producer.go

@@ -111,9 +111,8 @@ func (p *Producer) Errors() chan error {
 // You must call this function before a producer object passes out of scope, as
 // it may otherwise leak memory. You must call this before calling Close on the
 // underlying client.
+// TODO: This should lock something.
 func (p *Producer) Close() error {
-	p.m.Lock()
-	defer p.m.Unlock()
 	for _, bp := range p.brokerProducers {
 		bp.Close()
 	}
@@ -321,12 +320,6 @@ func (bp *brokerProducer) flush(p *Producer) {
 	}
 }
 
-func (bp *brokerProducer) CloseAndDisconnect(client *Client) {
-	broker := bp.broker
-	bp.Close()
-	client.disconnectBroker(broker)
-}
-
 func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) {
 	req := prb.toRequest(&p.config)
 	response, err := bp.broker.Produce(p.client.id, req)
@@ -342,7 +335,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 		errorCb(err)
 		return
 	default:
-		bp.CloseAndDisconnect(p.client)
+		bp.Close()
 
 		overlimit := 0
 		prb.reverseEach(func(msg *produceMessage) {
@@ -402,8 +395,14 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 }
 
 func (bp *brokerProducer) Close() error {
-	close(bp.stopper)
-	<-bp.done
+	fmt.Printf("%p Close()\n", bp)
+	select {
+	case <-bp.stopper:
+		return errors.New("already closed or closing")
+	default:
+		close(bp.stopper)
+		<-bp.done
+	}
 	return nil
 }