Bläddra i källkod

Merge pull request #1127 from bsm/feature/simpler-offset-manager

Simpler offset management, fixed minor race
Evan Huus 7 år sedan
förälder
incheckning
c5b44e38e1
4 ändrade filer med 342 tillägg och 357 borttagningar
  1. 1 0
      .gitignore
  2. 9 1
      config.go
  3. 313 325
      offset_manager.go
  4. 19 31
      offset_manager_test.go

+ 1 - 0
.gitignore

@@ -24,3 +24,4 @@ _testmain.go
 *.exe
 *.exe
 
 
 coverage.txt
 coverage.txt
+profile.out

+ 9 - 1
config.go

@@ -255,6 +255,12 @@ type Config struct {
 			// broker version 0.9.0 or later.
 			// broker version 0.9.0 or later.
 			// (default is 0: disabled).
 			// (default is 0: disabled).
 			Retention time.Duration
 			Retention time.Duration
+
+			Retry struct {
+				// The total number of times to retry failing commit
+				// requests during OffsetManager shutdown (default 3).
+				Max int
+			}
 		}
 		}
 	}
 	}
 
 
@@ -314,6 +320,7 @@ func NewConfig() *Config {
 	c.Consumer.Return.Errors = false
 	c.Consumer.Return.Errors = false
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
 	c.Consumer.Offsets.Initial = OffsetNewest
+	c.Consumer.Offsets.Retry.Max = 3
 
 
 	c.ClientID = defaultClientID
 	c.ClientID = defaultClientID
 	c.ChannelBufferSize = 256
 	c.ChannelBufferSize = 256
@@ -450,7 +457,8 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
 		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
 	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
 	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
 		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
 		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
-
+	case c.Consumer.Offsets.Retry.Max < 0:
+		return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
 	}
 	}
 
 
 	// validate misc shared values
 	// validate misc shared values

+ 313 - 325
offset_manager.go

@@ -25,10 +25,17 @@ type offsetManager struct {
 	client Client
 	client Client
 	conf   *Config
 	conf   *Config
 	group  string
 	group  string
+	ticker *time.Ticker
 
 
-	lock sync.Mutex
-	poms map[string]map[int32]*partitionOffsetManager
-	boms map[*Broker]*brokerOffsetManager
+	broker     *Broker
+	brokerLock sync.RWMutex
+
+	poms     map[string]map[int32]*partitionOffsetManager
+	pomsLock sync.Mutex
+
+	closeOnce sync.Once
+	closing   chan none
+	closed    chan none
 }
 }
 
 
 // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
 // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
@@ -39,13 +46,18 @@ func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, err
 		return nil, ErrClosedClient
 		return nil, ErrClosedClient
 	}
 	}
 
 
+	conf := client.Config()
 	om := &offsetManager{
 	om := &offsetManager{
 		client: client,
 		client: client,
-		conf:   client.Config(),
+		conf:   conf,
 		group:  group,
 		group:  group,
+		ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
 		poms:   make(map[string]map[int32]*partitionOffsetManager),
 		poms:   make(map[string]map[int32]*partitionOffsetManager),
-		boms:   make(map[*Broker]*brokerOffsetManager),
+
+		closing: make(chan none),
+		closed:  make(chan none),
 	}
 	}
+	go withRecover(om.mainLoop)
 
 
 	return om, nil
 	return om, nil
 }
 }
@@ -56,8 +68,8 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	om.lock.Lock()
-	defer om.lock.Unlock()
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 
 	topicManagers := om.poms[topic]
 	topicManagers := om.poms[topic]
 	if topicManagers == nil {
 	if topicManagers == nil {
@@ -74,53 +86,293 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
 }
 }
 
 
 func (om *offsetManager) Close() error {
 func (om *offsetManager) Close() error {
+	om.closeOnce.Do(func() {
+		// exit the mainLoop
+		close(om.closing)
+		<-om.closed
+
+		// mark all POMs as closed
+		om.asyncClosePOMs()
+
+		// flush one last time
+		for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
+			om.flushToBroker()
+			if om.releasePOMs(false) == 0 {
+				break
+			}
+		}
+
+		om.releasePOMs(true)
+		om.brokerLock.Lock()
+		om.broker = nil
+		om.brokerLock.Unlock()
+	})
 	return nil
 	return nil
 }
 }
 
 
-func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
+	broker, err := om.coordinator()
+	if err != nil {
+		if retries <= 0 {
+			return 0, "", err
+		}
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	}
+
+	req := new(OffsetFetchRequest)
+	req.Version = 1
+	req.ConsumerGroup = om.group
+	req.AddPartition(topic, partition)
+
+	resp, err := broker.FetchOffset(req)
+	if err != nil {
+		if retries <= 0 {
+			return 0, "", err
+		}
+		om.releaseCoordinator(broker)
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	}
+
+	block := resp.GetBlock(topic, partition)
+	if block == nil {
+		return 0, "", ErrIncompleteResponse
+	}
+
+	switch block.Err {
+	case ErrNoError:
+		return block.Offset, block.Metadata, nil
+	case ErrNotCoordinatorForConsumer:
+		if retries <= 0 {
+			return 0, "", block.Err
+		}
+		om.releaseCoordinator(broker)
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	case ErrOffsetsLoadInProgress:
+		if retries <= 0 {
+			return 0, "", block.Err
+		}
+		select {
+		case <-om.closing:
+			return 0, "", block.Err
+		case <-time.After(om.conf.Metadata.Retry.Backoff):
+		}
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	default:
+		return 0, "", block.Err
+	}
+}
+
+func (om *offsetManager) coordinator() (*Broker, error) {
+	om.brokerLock.RLock()
+	broker := om.broker
+	om.brokerLock.RUnlock()
+
+	if broker != nil {
+		return broker, nil
+	}
+
+	om.brokerLock.Lock()
+	defer om.brokerLock.Unlock()
+
+	if broker := om.broker; broker != nil {
+		return broker, nil
+	}
+
+	if err := om.client.RefreshCoordinator(om.group); err != nil {
+		return nil, err
+	}
+
+	broker, err := om.client.Coordinator(om.group)
+	if err != nil {
+		return nil, err
+	}
+
+	om.broker = broker
+	return broker, nil
+}
+
+func (om *offsetManager) releaseCoordinator(b *Broker) {
+	om.brokerLock.Lock()
+	if om.broker == b {
+		om.broker = nil
+	}
+	om.brokerLock.Unlock()
+}
+
+func (om *offsetManager) mainLoop() {
+	defer om.ticker.Stop()
+	defer close(om.closed)
+
+	for {
+		select {
+		case <-om.ticker.C:
+			om.flushToBroker()
+			om.releasePOMs(false)
+		case <-om.closing:
+			return
+		}
+	}
+}
+
+func (om *offsetManager) flushToBroker() {
+	req := om.constructRequest()
+	if req == nil {
+		return
+	}
+
+	broker, err := om.coordinator()
+	if err != nil {
+		om.handleError(err)
+		return
+	}
+
+	resp, err := broker.CommitOffset(req)
+	if err != nil {
+		om.handleError(err)
+		om.releaseCoordinator(broker)
+		_ = broker.Close()
+		return
+	}
+
+	om.handleResponse(broker, req, resp)
+}
+
+func (om *offsetManager) constructRequest() *OffsetCommitRequest {
+	var r *OffsetCommitRequest
+	var perPartitionTimestamp int64
+	if om.conf.Consumer.Offsets.Retention == 0 {
+		perPartitionTimestamp = ReceiveTime
+		r = &OffsetCommitRequest{
+			Version:                 1,
+			ConsumerGroup:           om.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+	} else {
+		r = &OffsetCommitRequest{
+			Version:                 2,
+			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
+			ConsumerGroup:           om.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+
+	}
+
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 
-	bom := om.boms[broker]
-	if bom == nil {
-		bom = om.newBrokerOffsetManager(broker)
-		om.boms[broker] = bom
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.lock.Lock()
+			if pom.dirty {
+				r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
+			}
+			pom.lock.Unlock()
+		}
 	}
 	}
 
 
-	bom.refs++
+	if len(r.blocks) > 0 {
+		return r
+	}
 
 
-	return bom
+	return nil
 }
 }
 
 
-func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
+
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
+				continue
+			}
+
+			var err KError
+			var ok bool
+
+			if resp.Errors[pom.topic] == nil {
+				pom.handleError(ErrIncompleteResponse)
+				continue
+			}
+			if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
+				pom.handleError(ErrIncompleteResponse)
+				continue
+			}
+
+			switch err {
+			case ErrNoError:
+				block := req.blocks[pom.topic][pom.partition]
+				pom.updateCommitted(block.offset, block.metadata)
+			case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
+				ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
+				// not a critical error, we just need to redispatch
+				om.releaseCoordinator(broker)
+			case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
+				// nothing we can do about this, just tell the user and carry on
+				pom.handleError(err)
+			case ErrOffsetsLoadInProgress:
+				// nothing wrong but we didn't commit, we'll get it next time round
+				break
+			case ErrUnknownTopicOrPartition:
+				// let the user know *and* try redispatching - if topic-auto-create is
+				// enabled, redispatching should trigger a metadata req and create the
+				// topic; if not then re-dispatching won't help, but we've let the user
+				// know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
+				fallthrough
+			default:
+				// dunno, tell the user and try redispatching
+				pom.handleError(err)
+				om.releaseCoordinator(broker)
+			}
+		}
+	}
+}
 
 
-	bom.refs--
+func (om *offsetManager) handleError(err error) {
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 
-	if bom.refs == 0 {
-		close(bom.updateSubscriptions)
-		if om.boms[bom.broker] == bom {
-			delete(om.boms, bom.broker)
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.handleError(err)
 		}
 		}
 	}
 	}
 }
 }
 
 
-func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+func (om *offsetManager) asyncClosePOMs() {
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 
-	delete(om.boms, bom.broker)
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.AsyncClose()
+		}
+	}
 }
 }
 
 
-func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+// Releases/removes closed POMs once they are clean (or when forced)
+func (om *offsetManager) releasePOMs(force bool) (remaining int) {
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
+
+	for topic, topicManagers := range om.poms {
+		for partition, pom := range topicManagers {
+			pom.lock.Lock()
+			releaseDue := pom.done && (force || !pom.dirty)
+			pom.lock.Unlock()
 
 
-	delete(om.poms[pom.topic], pom.partition)
-	if len(om.poms[pom.topic]) == 0 {
-		delete(om.poms, pom.topic)
+			if releaseDue {
+				pom.release()
+
+				delete(om.poms[topic], partition)
+				if len(om.poms[topic]) == 0 {
+					delete(om.poms, topic)
+				}
+			}
+		}
+		remaining += len(om.poms[topic])
 	}
 	}
+	return
 }
 }
 
 
 // Partition Offset Manager
 // Partition Offset Manager
@@ -187,138 +439,26 @@ type partitionOffsetManager struct {
 	offset   int64
 	offset   int64
 	metadata string
 	metadata string
 	dirty    bool
 	dirty    bool
-	clean    sync.Cond
-	broker   *brokerOffsetManager
+	done     bool
 
 
-	errors    chan *ConsumerError
-	rebalance chan none
-	dying     chan none
+	releaseOnce sync.Once
+	errors      chan *ConsumerError
 }
 }
 
 
 func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
 func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
-	pom := &partitionOffsetManager{
+	offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
+	if err != nil {
+		return nil, err
+	}
+
+	return &partitionOffsetManager{
 		parent:    om,
 		parent:    om,
 		topic:     topic,
 		topic:     topic,
 		partition: partition,
 		partition: partition,
 		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
 		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
-		rebalance: make(chan none, 1),
-		dying:     make(chan none),
-	}
-	pom.clean.L = &pom.lock
-
-	if err := pom.selectBroker(); err != nil {
-		return nil, err
-	}
-
-	if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
-		return nil, err
-	}
-
-	pom.broker.updateSubscriptions <- pom
-
-	go withRecover(pom.mainLoop)
-
-	return pom, nil
-}
-
-func (pom *partitionOffsetManager) mainLoop() {
-	for {
-		select {
-		case <-pom.rebalance:
-			if err := pom.selectBroker(); err != nil {
-				pom.handleError(err)
-				pom.rebalance <- none{}
-			} else {
-				pom.broker.updateSubscriptions <- pom
-			}
-		case <-pom.dying:
-			if pom.broker != nil {
-				select {
-				case <-pom.rebalance:
-				case pom.broker.updateSubscriptions <- pom:
-				}
-				pom.parent.unrefBrokerOffsetManager(pom.broker)
-			}
-			pom.parent.abandonPartitionOffsetManager(pom)
-			close(pom.errors)
-			return
-		}
-	}
-}
-
-func (pom *partitionOffsetManager) selectBroker() error {
-	if pom.broker != nil {
-		pom.parent.unrefBrokerOffsetManager(pom.broker)
-		pom.broker = nil
-	}
-
-	var broker *Broker
-	var err error
-
-	if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
-		return err
-	}
-
-	if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
-		return err
-	}
-
-	pom.broker = pom.parent.refBrokerOffsetManager(broker)
-	return nil
-}
-
-func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
-	request := new(OffsetFetchRequest)
-	request.Version = 1
-	request.ConsumerGroup = pom.parent.group
-	request.AddPartition(pom.topic, pom.partition)
-
-	response, err := pom.broker.broker.FetchOffset(request)
-	if err != nil {
-		return err
-	}
-
-	block := response.GetBlock(pom.topic, pom.partition)
-	if block == nil {
-		return ErrIncompleteResponse
-	}
-
-	switch block.Err {
-	case ErrNoError:
-		pom.offset = block.Offset
-		pom.metadata = block.Metadata
-		return nil
-	case ErrNotCoordinatorForConsumer:
-		if retries <= 0 {
-			return block.Err
-		}
-		if err := pom.selectBroker(); err != nil {
-			return err
-		}
-		return pom.fetchInitialOffset(retries - 1)
-	case ErrOffsetsLoadInProgress:
-		if retries <= 0 {
-			return block.Err
-		}
-		time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
-		return pom.fetchInitialOffset(retries - 1)
-	default:
-		return block.Err
-	}
-}
-
-func (pom *partitionOffsetManager) handleError(err error) {
-	cErr := &ConsumerError{
-		Topic:     pom.topic,
-		Partition: pom.partition,
-		Err:       err,
-	}
-
-	if pom.parent.conf.Consumer.Return.Errors {
-		pom.errors <- cErr
-	} else {
-		Logger.Println(cErr)
-	}
+		offset:    offset,
+		metadata:  metadata,
+	}, nil
 }
 }
 
 
 func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
 func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
@@ -353,7 +493,6 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string
 
 
 	if pom.offset == offset && pom.metadata == metadata {
 	if pom.offset == offset && pom.metadata == metadata {
 		pom.dirty = false
 		pom.dirty = false
-		pom.clean.Signal()
 	}
 	}
 }
 }
 
 
@@ -369,16 +508,9 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
 }
 }
 
 
 func (pom *partitionOffsetManager) AsyncClose() {
 func (pom *partitionOffsetManager) AsyncClose() {
-	go func() {
-		pom.lock.Lock()
-		defer pom.lock.Unlock()
-
-		for pom.dirty {
-			pom.clean.Wait()
-		}
-
-		close(pom.dying)
-	}()
+	pom.lock.Lock()
+	pom.done = true
+	pom.lock.Unlock()
 }
 }
 
 
 func (pom *partitionOffsetManager) Close() error {
 func (pom *partitionOffsetManager) Close() error {
@@ -395,166 +527,22 @@ func (pom *partitionOffsetManager) Close() error {
 	return nil
 	return nil
 }
 }
 
 
-// Broker Offset Manager
-
-type brokerOffsetManager struct {
-	parent              *offsetManager
-	broker              *Broker
-	timer               *time.Ticker
-	updateSubscriptions chan *partitionOffsetManager
-	subscriptions       map[*partitionOffsetManager]none
-	refs                int
-}
-
-func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
-	bom := &brokerOffsetManager{
-		parent:              om,
-		broker:              broker,
-		timer:               time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
-		updateSubscriptions: make(chan *partitionOffsetManager),
-		subscriptions:       make(map[*partitionOffsetManager]none),
-	}
-
-	go withRecover(bom.mainLoop)
-
-	return bom
-}
-
-func (bom *brokerOffsetManager) mainLoop() {
-	for {
-		select {
-		case <-bom.timer.C:
-			if len(bom.subscriptions) > 0 {
-				bom.flushToBroker()
-			}
-		case s, ok := <-bom.updateSubscriptions:
-			if !ok {
-				bom.timer.Stop()
-				return
-			}
-			if _, ok := bom.subscriptions[s]; ok {
-				delete(bom.subscriptions, s)
-			} else {
-				bom.subscriptions[s] = none{}
-			}
-		}
-	}
-}
-
-func (bom *brokerOffsetManager) flushToBroker() {
-	request := bom.constructRequest()
-	if request == nil {
-		return
-	}
-
-	response, err := bom.broker.CommitOffset(request)
-
-	if err != nil {
-		bom.abort(err)
-		return
-	}
-
-	for s := range bom.subscriptions {
-		if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil {
-			continue
-		}
-
-		var err KError
-		var ok bool
-
-		if response.Errors[s.topic] == nil {
-			s.handleError(ErrIncompleteResponse)
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-			continue
-		}
-		if err, ok = response.Errors[s.topic][s.partition]; !ok {
-			s.handleError(ErrIncompleteResponse)
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-			continue
-		}
-
-		switch err {
-		case ErrNoError:
-			block := request.blocks[s.topic][s.partition]
-			s.updateCommitted(block.offset, block.metadata)
-		case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
-			ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
-			// not a critical error, we just need to redispatch
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-		case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
-			// nothing we can do about this, just tell the user and carry on
-			s.handleError(err)
-		case ErrOffsetsLoadInProgress:
-			// nothing wrong but we didn't commit, we'll get it next time round
-			break
-		case ErrUnknownTopicOrPartition:
-			// let the user know *and* try redispatching - if topic-auto-create is
-			// enabled, redispatching should trigger a metadata request and create the
-			// topic; if not then re-dispatching won't help, but we've let the user
-			// know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
-			fallthrough
-		default:
-			// dunno, tell the user and try redispatching
-			s.handleError(err)
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-		}
+func (pom *partitionOffsetManager) handleError(err error) {
+	cErr := &ConsumerError{
+		Topic:     pom.topic,
+		Partition: pom.partition,
+		Err:       err,
 	}
 	}
-}
 
 
-func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
-	var r *OffsetCommitRequest
-	var perPartitionTimestamp int64
-	if bom.parent.conf.Consumer.Offsets.Retention == 0 {
-		perPartitionTimestamp = ReceiveTime
-		r = &OffsetCommitRequest{
-			Version:                 1,
-			ConsumerGroup:           bom.parent.group,
-			ConsumerGroupGeneration: GroupGenerationUndefined,
-		}
+	if pom.parent.conf.Consumer.Return.Errors {
+		pom.errors <- cErr
 	} else {
 	} else {
-		r = &OffsetCommitRequest{
-			Version:                 2,
-			RetentionTime:           int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
-			ConsumerGroup:           bom.parent.group,
-			ConsumerGroupGeneration: GroupGenerationUndefined,
-		}
-
-	}
-
-	for s := range bom.subscriptions {
-		s.lock.Lock()
-		if s.dirty {
-			r.AddBlock(s.topic, s.partition, s.offset, perPartitionTimestamp, s.metadata)
-		}
-		s.lock.Unlock()
-	}
-
-	if len(r.blocks) > 0 {
-		return r
+		Logger.Println(cErr)
 	}
 	}
-
-	return nil
 }
 }
 
 
-func (bom *brokerOffsetManager) abort(err error) {
-	_ = bom.broker.Close() // we don't care about the error this might return, we already have one
-	bom.parent.abandonBroker(bom)
-
-	for pom := range bom.subscriptions {
-		pom.handleError(err)
-		pom.rebalance <- none{}
-	}
-
-	for s := range bom.updateSubscriptions {
-		if _, ok := bom.subscriptions[s]; !ok {
-			s.handleError(err)
-			s.rebalance <- none{}
-		}
-	}
-
-	bom.subscriptions = make(map[*partitionOffsetManager]none)
+func (pom *partitionOffsetManager) release() {
+	pom.releaseOnce.Do(func() {
+		go close(pom.errors)
+	})
 }
 }

+ 19 - 31
offset_manager_test.go

@@ -5,13 +5,16 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-func initOffsetManager(t *testing.T) (om OffsetManager,
+func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
 	testClient Client, broker, coordinator *MockBroker) {
 	testClient Client, broker, coordinator *MockBroker) {
 
 
 	config := NewConfig()
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	config.Metadata.Retry.Max = 1
 	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
 	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
 	config.Version = V0_9_0_0
 	config.Version = V0_9_0_0
+	if retention > 0 {
+		config.Consumer.Offsets.Retention = retention
+	}
 
 
 	broker = NewMockBroker(t, 1)
 	broker = NewMockBroker(t, 1)
 	coordinator = NewMockBroker(t, 2)
 	coordinator = NewMockBroker(t, 2)
@@ -64,31 +67,30 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager,
 func TestNewOffsetManager(t *testing.T) {
 func TestNewOffsetManager(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	seedBroker := NewMockBroker(t, 1)
 	seedBroker.Returns(new(MetadataResponse))
 	seedBroker.Returns(new(MetadataResponse))
+	defer seedBroker.Close()
 
 
 	testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
 	testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	_, err = NewOffsetManagerFromClient("group", testClient)
+	om, err := NewOffsetManagerFromClient("group", testClient)
 	if err != nil {
 	if err != nil {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
-
+	safeClose(t, om)
 	safeClose(t, testClient)
 	safeClose(t, testClient)
 
 
 	_, err = NewOffsetManagerFromClient("group", testClient)
 	_, err = NewOffsetManagerFromClient("group", testClient)
 	if err != ErrClosedClient {
 	if err != ErrClosedClient {
 		t.Errorf("Error expected for closed client; actual value: %v", err)
 		t.Errorf("Error expected for closed client; actual value: %v", err)
 	}
 	}
-
-	seedBroker.Close()
 }
 }
 
 
 // Test recovery from ErrNotCoordinatorForConsumer
 // Test recovery from ErrNotCoordinatorForConsumer
 // on first fetchInitialOffset call
 // on first fetchInitialOffset call
 func TestOffsetManagerFetchInitialFail(t *testing.T) {
 func TestOffsetManagerFetchInitialFail(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 
 
 	// Error on first fetchInitialOffset call
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{
 	responseBlock := OffsetFetchResponseBlock{
@@ -131,7 +133,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
 
 
 // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
 // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
 func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 
 
 	// Error on first fetchInitialOffset call
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{
 	responseBlock := OffsetFetchResponseBlock{
@@ -164,7 +166,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 }
 }
 
 
 func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
 func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	testClient.Config().Consumer.Offsets.Initial = OffsetOldest
 	testClient.Config().Consumer.Offsets.Initial = OffsetOldest
 
 
 	// Kafka returns -1 if no offset has been stored for this partition yet.
 	// Kafka returns -1 if no offset has been stored for this partition yet.
@@ -186,7 +188,7 @@ func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
 }
 }
 
 
 func TestPartitionOffsetManagerNextOffset(t *testing.T) {
 func TestPartitionOffsetManagerNextOffset(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
 
 
 	offset, meta := pom.NextOffset()
 	offset, meta := pom.NextOffset()
@@ -205,7 +207,7 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
 }
 }
 
 
 func TestPartitionOffsetManagerResetOffset(t *testing.T) {
 func TestPartitionOffsetManagerResetOffset(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 
 
 	ocResponse := new(OffsetCommitResponse)
 	ocResponse := new(OffsetCommitResponse)
@@ -231,9 +233,7 @@ func TestPartitionOffsetManagerResetOffset(t *testing.T) {
 }
 }
 
 
 func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
 func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
-	testClient.Config().Consumer.Offsets.Retention = time.Hour
-
+	om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 
 
 	ocResponse := new(OffsetCommitResponse)
 	ocResponse := new(OffsetCommitResponse)
@@ -269,7 +269,7 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
 }
 }
 
 
 func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
 func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 
 
 	ocResponse := new(OffsetCommitResponse)
 	ocResponse := new(OffsetCommitResponse)
@@ -294,9 +294,7 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
 }
 }
 
 
 func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
 func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
-	testClient.Config().Consumer.Offsets.Retention = time.Hour
-
+	om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 
 
 	ocResponse := new(OffsetCommitResponse)
 	ocResponse := new(OffsetCommitResponse)
@@ -331,7 +329,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
 }
 }
 
 
 func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 func TestPartitionOffsetManagerCommitErr(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
 
 
 	// Error on one partition
 	// Error on one partition
@@ -353,24 +351,14 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 	ocResponse2 := new(OffsetCommitResponse)
 	ocResponse2 := new(OffsetCommitResponse)
 	newCoordinator.Returns(ocResponse2)
 	newCoordinator.Returns(ocResponse2)
 
 
-	// For RefreshCoordinator()
-	broker.Returns(&ConsumerMetadataResponse{
-		CoordinatorID:   newCoordinator.BrokerID(),
-		CoordinatorHost: "127.0.0.1",
-		CoordinatorPort: newCoordinator.Port(),
-	})
+	// No error, no need to refresh coordinator
 
 
 	// Error on the wrong partition for this pom
 	// Error on the wrong partition for this pom
 	ocResponse3 := new(OffsetCommitResponse)
 	ocResponse3 := new(OffsetCommitResponse)
 	ocResponse3.AddError("my_topic", 1, ErrNoError)
 	ocResponse3.AddError("my_topic", 1, ErrNoError)
 	newCoordinator.Returns(ocResponse3)
 	newCoordinator.Returns(ocResponse3)
 
 
-	// For RefreshCoordinator()
-	broker.Returns(&ConsumerMetadataResponse{
-		CoordinatorID:   newCoordinator.BrokerID(),
-		CoordinatorHost: "127.0.0.1",
-		CoordinatorPort: newCoordinator.Port(),
-	})
+	// No error, no need to refresh coordinator
 
 
 	// ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
 	// ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
 	ocResponse4 := new(OffsetCommitResponse)
 	ocResponse4 := new(OffsetCommitResponse)
@@ -405,7 +393,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 
 
 // Test of recovery from abort
 // Test of recovery from abort
 func TestAbortPartitionOffsetManager(t *testing.T) {
 func TestAbortPartitionOffsetManager(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t)
+	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
 
 
 	// this triggers an error in the CommitOffset request,
 	// this triggers an error in the CommitOffset request,