Browse Source

concurrency fixes

Burke Libbey 12 years ago
parent
commit
aa9f8942f6
2 changed files with 8 additions and 1 deletions
  1. 4 0
      partitioner.go
  2. 4 1
      producer.go

+ 4 - 0
partitioner.go

@@ -4,6 +4,7 @@ import (
 	"hash"
 	"hash/fnv"
 	"math/rand"
+	"sync"
 	"time"
 )
 
@@ -17,6 +18,7 @@ type Partitioner interface {
 // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
 type RandomPartitioner struct {
 	generator *rand.Rand
+	m         sync.Mutex
 }
 
 func NewRandomPartitioner() *RandomPartitioner {
@@ -26,6 +28,8 @@ func NewRandomPartitioner() *RandomPartitioner {
 }
 
 func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+	p.m.Lock()
+	defer p.m.Unlock()
 	return int32(p.generator.Intn(int(numPartitions)))
 }
 

+ 4 - 1
producer.go

@@ -318,7 +318,10 @@ func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
 func (bp *brokerProducer) flushIfAnyMessages(p *Producer) {
 	select {
 	case <-bp.hasMessages:
-		bp.hasMessages <- true
+		select {
+		case bp.hasMessages <- true:
+		default:
+		}
 		bp.flush(p)
 	default:
 	}