Przeglądaj źródła

Simpler offset management, fixed minor race

Dimitrij Denissenko 7 lat temu
rodzic
commit
c1a36ee230
2 zmienionych plików z 324 dodań i 341 usunięć
  1. 319 325
      offset_manager.go
  2. 5 16
      offset_manager_test.go

+ 319 - 325
offset_manager.go

@@ -25,10 +25,17 @@ type offsetManager struct {
 	client Client
 	conf   *Config
 	group  string
+	ticker *time.Ticker
 
-	lock sync.Mutex
-	poms map[string]map[int32]*partitionOffsetManager
-	boms map[*Broker]*brokerOffsetManager
+	broker   *Broker
+	brokerMu sync.RWMutex
+
+	poms   map[string]map[int32]*partitionOffsetManager
+	pomsMu sync.Mutex
+
+	closeOnce sync.Once
+	closing   chan none
+	closed    chan none
 }
 
 // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
@@ -39,13 +46,18 @@ func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, err
 		return nil, ErrClosedClient
 	}
 
+	conf := client.Config()
 	om := &offsetManager{
 		client: client,
-		conf:   client.Config(),
+		conf:   conf,
 		group:  group,
+		ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
 		poms:   make(map[string]map[int32]*partitionOffsetManager),
-		boms:   make(map[*Broker]*brokerOffsetManager),
+
+		closing: make(chan none),
+		closed:  make(chan none),
 	}
+	go withRecover(om.mainLoop)
 
 	return om, nil
 }
@@ -56,8 +68,8 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
 		return nil, err
 	}
 
-	om.lock.Lock()
-	defer om.lock.Unlock()
+	om.pomsMu.Lock()
+	defer om.pomsMu.Unlock()
 
 	topicManagers := om.poms[topic]
 	if topicManagers == nil {
@@ -74,52 +86,298 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
 }
 
 func (om *offsetManager) Close() error {
+	om.closeOnce.Do(func() {
+		// exit the mainLoop
+		close(om.closing)
+		<-om.closed
+
+		// mark all POMs as closed
+		om.asyncClosePOMs()
+
+		// flush one last time
+		for retries := om.conf.Metadata.Retry.Max; true; {
+			if om.flushToBroker() {
+				break
+			}
+			if retries--; retries < 0 {
+				break
+			}
+		}
+
+		om.releasePOMs(true)
+		om.releaseCoordinator()
+	})
 	return nil
 }
 
-func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
+	broker, err := om.coordinator()
+	if err != nil {
+		if retries <= 0 {
+			return 0, "", err
+		}
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	}
+
+	request := new(OffsetFetchRequest)
+	request.Version = 1
+	request.ConsumerGroup = om.group
+	request.AddPartition(topic, partition)
+
+	response, err := broker.FetchOffset(request)
+	if err != nil {
+		if retries <= 0 {
+			return 0, "", err
+		}
+		om.releaseCoordinator()
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	}
+
+	block := response.GetBlock(topic, partition)
+	if block == nil {
+		return 0, "", ErrIncompleteResponse
+	}
+
+	switch block.Err {
+	case ErrNoError:
+		return block.Offset, block.Metadata, nil
+	case ErrNotCoordinatorForConsumer:
+		if retries <= 0 {
+			return 0, "", block.Err
+		}
+		om.releaseCoordinator()
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	case ErrOffsetsLoadInProgress:
+		if retries <= 0 {
+			return 0, "", block.Err
+		}
+		select {
+		case <-om.closing:
+			return 0, "", block.Err
+		case <-time.After(om.conf.Metadata.Retry.Backoff):
+		}
+		return om.fetchInitialOffset(topic, partition, retries-1)
+	default:
+		return 0, "", block.Err
+	}
+}
+
+func (om *offsetManager) coordinator() (*Broker, error) {
+	om.brokerMu.RLock()
+	broker := om.broker
+	om.brokerMu.RUnlock()
+
+	if broker != nil {
+		return broker, nil
+	}
+
+	om.brokerMu.Lock()
+	defer om.brokerMu.Unlock()
+
+	if broker := om.broker; broker != nil {
+		return broker, nil
+	}
+
+	if err := om.client.RefreshCoordinator(om.group); err != nil {
+		return nil, err
+	}
+
+	broker, err := om.client.Coordinator(om.group)
+	if err != nil {
+		return nil, err
+	}
+
+	om.broker = broker
+	return broker, nil
+}
+
+func (om *offsetManager) releaseCoordinator() {
+	om.brokerMu.Lock()
+	om.broker = nil
+	om.brokerMu.Unlock()
+}
+
+func (om *offsetManager) abandonCoordinator() {
+	om.brokerMu.Lock()
+	broker := om.broker
+	om.broker = nil
+	om.brokerMu.Unlock()
+
+	if broker != nil {
+		_ = broker.Close()
+	}
+}
+
+func (om *offsetManager) mainLoop() {
+	defer om.ticker.Stop()
+	defer close(om.closed)
+
+	for {
+		select {
+		case <-om.ticker.C:
+			om.flushToBroker()
+			om.releasePOMs(false)
+		case <-om.closing:
+			return
+		}
+	}
+}
+
+func (om *offsetManager) flushToBroker() (success bool) {
+	request := om.constructRequest()
+	if request == nil {
+		return true
+	}
 
-	bom := om.boms[broker]
-	if bom == nil {
-		bom = om.newBrokerOffsetManager(broker)
-		om.boms[broker] = bom
+	broker, err := om.coordinator()
+	if err != nil {
+		om.handleError(err)
+		return false
 	}
 
-	bom.refs++
+	response, err := broker.CommitOffset(request)
+	if err != nil {
+		om.abandonCoordinator()
+		om.handleError(err)
+		return false
+	}
+
+	success = true
 
-	return bom
+	om.pomsMu.Lock()
+	defer om.pomsMu.Unlock()
+
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			if request.blocks[pom.topic] == nil || request.blocks[pom.topic][pom.partition] == nil {
+				continue
+			}
+
+			var err KError
+			var ok bool
+
+			if response.Errors[pom.topic] == nil {
+				pom.handleError(ErrIncompleteResponse)
+				continue
+			}
+			if err, ok = response.Errors[pom.topic][pom.partition]; !ok {
+				pom.handleError(ErrIncompleteResponse)
+				continue
+			}
+
+			switch err {
+			case ErrNoError:
+				block := request.blocks[pom.topic][pom.partition]
+				pom.updateCommitted(block.offset, block.metadata)
+			case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
+				ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
+				// not a critical error, we just need to redispatch
+				om.releaseCoordinator()
+			case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
+				// nothing we can do about this, just tell the user and carry on
+				success = false
+				pom.handleError(err)
+			case ErrOffsetsLoadInProgress:
+				// nothing wrong but we didn't commit, we'll get it next time round
+				break
+			case ErrUnknownTopicOrPartition:
+				// let the user know *and* try redispatching - if topic-auto-create is
+				// enabled, redispatching should trigger a metadata request and create the
+				// topic; if not then re-dispatching won't help, but we've let the user
+				// know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
+				fallthrough
+			default:
+				// dunno, tell the user and try redispatching
+				success = false
+				pom.handleError(err)
+				om.releaseCoordinator()
+			}
+		}
+	}
+	return
 }
 
-func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+func (om *offsetManager) constructRequest() *OffsetCommitRequest {
+	var r *OffsetCommitRequest
+	var perPartitionTimestamp int64
+	if om.conf.Consumer.Offsets.Retention == 0 {
+		perPartitionTimestamp = ReceiveTime
+		r = &OffsetCommitRequest{
+			Version:                 1,
+			ConsumerGroup:           om.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+	} else {
+		r = &OffsetCommitRequest{
+			Version:                 2,
+			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
+			ConsumerGroup:           om.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+
+	}
 
-	bom.refs--
+	om.pomsMu.Lock()
+	defer om.pomsMu.Unlock()
 
-	if bom.refs == 0 {
-		close(bom.updateSubscriptions)
-		if om.boms[bom.broker] == bom {
-			delete(om.boms, bom.broker)
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.lock.Lock()
+			if pom.dirty {
+				r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
+			}
+			pom.lock.Unlock()
+		}
+	}
+
+	if len(r.blocks) > 0 {
+		return r
+	}
+
+	return nil
+}
+
+func (om *offsetManager) handleError(err error) {
+	om.pomsMu.Lock()
+	defer om.pomsMu.Unlock()
+
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.handleError(err)
 		}
 	}
 }
 
-func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+func (om *offsetManager) asyncClosePOMs() {
+	om.pomsMu.Lock()
+	defer om.pomsMu.Unlock()
 
-	delete(om.boms, bom.broker)
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.AsyncClose()
+		}
+	}
 }
 
-func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
-	om.lock.Lock()
-	defer om.lock.Unlock()
+func (om *offsetManager) releasePOMs(force bool) {
+	om.pomsMu.Lock()
+	defer om.pomsMu.Unlock()
+
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.lock.Lock()
+			releaseDue := pom.done && (force || !pom.dirty)
+			pom.lock.Unlock()
+
+			if releaseDue {
+				pom.release()
 
-	delete(om.poms[pom.topic], pom.partition)
-	if len(om.poms[pom.topic]) == 0 {
-		delete(om.poms, pom.topic)
+				delete(om.poms[pom.topic], pom.partition)
+				if len(om.poms[pom.topic]) == 0 {
+					delete(om.poms, pom.topic)
+				}
+			}
+		}
 	}
 }
 
@@ -187,138 +445,26 @@ type partitionOffsetManager struct {
 	offset   int64
 	metadata string
 	dirty    bool
-	clean    sync.Cond
-	broker   *brokerOffsetManager
+	done     bool
 
-	errors    chan *ConsumerError
-	rebalance chan none
-	dying     chan none
+	releaseOnce sync.Once
+	errors      chan *ConsumerError
 }
 
 func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
-	pom := &partitionOffsetManager{
+	offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
+	if err != nil {
+		return nil, err
+	}
+
+	return &partitionOffsetManager{
 		parent:    om,
 		topic:     topic,
 		partition: partition,
 		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
-		rebalance: make(chan none, 1),
-		dying:     make(chan none),
-	}
-	pom.clean.L = &pom.lock
-
-	if err := pom.selectBroker(); err != nil {
-		return nil, err
-	}
-
-	if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
-		return nil, err
-	}
-
-	pom.broker.updateSubscriptions <- pom
-
-	go withRecover(pom.mainLoop)
-
-	return pom, nil
-}
-
-func (pom *partitionOffsetManager) mainLoop() {
-	for {
-		select {
-		case <-pom.rebalance:
-			if err := pom.selectBroker(); err != nil {
-				pom.handleError(err)
-				pom.rebalance <- none{}
-			} else {
-				pom.broker.updateSubscriptions <- pom
-			}
-		case <-pom.dying:
-			if pom.broker != nil {
-				select {
-				case <-pom.rebalance:
-				case pom.broker.updateSubscriptions <- pom:
-				}
-				pom.parent.unrefBrokerOffsetManager(pom.broker)
-			}
-			pom.parent.abandonPartitionOffsetManager(pom)
-			close(pom.errors)
-			return
-		}
-	}
-}
-
-func (pom *partitionOffsetManager) selectBroker() error {
-	if pom.broker != nil {
-		pom.parent.unrefBrokerOffsetManager(pom.broker)
-		pom.broker = nil
-	}
-
-	var broker *Broker
-	var err error
-
-	if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
-		return err
-	}
-
-	if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
-		return err
-	}
-
-	pom.broker = pom.parent.refBrokerOffsetManager(broker)
-	return nil
-}
-
-func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
-	request := new(OffsetFetchRequest)
-	request.Version = 1
-	request.ConsumerGroup = pom.parent.group
-	request.AddPartition(pom.topic, pom.partition)
-
-	response, err := pom.broker.broker.FetchOffset(request)
-	if err != nil {
-		return err
-	}
-
-	block := response.GetBlock(pom.topic, pom.partition)
-	if block == nil {
-		return ErrIncompleteResponse
-	}
-
-	switch block.Err {
-	case ErrNoError:
-		pom.offset = block.Offset
-		pom.metadata = block.Metadata
-		return nil
-	case ErrNotCoordinatorForConsumer:
-		if retries <= 0 {
-			return block.Err
-		}
-		if err := pom.selectBroker(); err != nil {
-			return err
-		}
-		return pom.fetchInitialOffset(retries - 1)
-	case ErrOffsetsLoadInProgress:
-		if retries <= 0 {
-			return block.Err
-		}
-		time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
-		return pom.fetchInitialOffset(retries - 1)
-	default:
-		return block.Err
-	}
-}
-
-func (pom *partitionOffsetManager) handleError(err error) {
-	cErr := &ConsumerError{
-		Topic:     pom.topic,
-		Partition: pom.partition,
-		Err:       err,
-	}
-
-	if pom.parent.conf.Consumer.Return.Errors {
-		pom.errors <- cErr
-	} else {
-		Logger.Println(cErr)
-	}
+		offset:    offset,
+		metadata:  metadata,
+	}, nil
 }
 
 func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
@@ -353,7 +499,6 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string
 
 	if pom.offset == offset && pom.metadata == metadata {
 		pom.dirty = false
-		pom.clean.Signal()
 	}
 }
 
@@ -369,16 +514,9 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
 }
 
 func (pom *partitionOffsetManager) AsyncClose() {
-	go func() {
-		pom.lock.Lock()
-		defer pom.lock.Unlock()
-
-		for pom.dirty {
-			pom.clean.Wait()
-		}
-
-		close(pom.dying)
-	}()
+	pom.lock.Lock()
+	pom.done = true
+	pom.lock.Unlock()
 }
 
 func (pom *partitionOffsetManager) Close() error {
@@ -395,166 +533,22 @@ func (pom *partitionOffsetManager) Close() error {
 	return nil
 }
 
-// Broker Offset Manager
-
-type brokerOffsetManager struct {
-	parent              *offsetManager
-	broker              *Broker
-	timer               *time.Ticker
-	updateSubscriptions chan *partitionOffsetManager
-	subscriptions       map[*partitionOffsetManager]none
-	refs                int
-}
-
-func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
-	bom := &brokerOffsetManager{
-		parent:              om,
-		broker:              broker,
-		timer:               time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
-		updateSubscriptions: make(chan *partitionOffsetManager),
-		subscriptions:       make(map[*partitionOffsetManager]none),
-	}
-
-	go withRecover(bom.mainLoop)
-
-	return bom
-}
-
-func (bom *brokerOffsetManager) mainLoop() {
-	for {
-		select {
-		case <-bom.timer.C:
-			if len(bom.subscriptions) > 0 {
-				bom.flushToBroker()
-			}
-		case s, ok := <-bom.updateSubscriptions:
-			if !ok {
-				bom.timer.Stop()
-				return
-			}
-			if _, ok := bom.subscriptions[s]; ok {
-				delete(bom.subscriptions, s)
-			} else {
-				bom.subscriptions[s] = none{}
-			}
-		}
-	}
-}
-
-func (bom *brokerOffsetManager) flushToBroker() {
-	request := bom.constructRequest()
-	if request == nil {
-		return
-	}
-
-	response, err := bom.broker.CommitOffset(request)
-
-	if err != nil {
-		bom.abort(err)
-		return
-	}
-
-	for s := range bom.subscriptions {
-		if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil {
-			continue
-		}
-
-		var err KError
-		var ok bool
-
-		if response.Errors[s.topic] == nil {
-			s.handleError(ErrIncompleteResponse)
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-			continue
-		}
-		if err, ok = response.Errors[s.topic][s.partition]; !ok {
-			s.handleError(ErrIncompleteResponse)
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-			continue
-		}
-
-		switch err {
-		case ErrNoError:
-			block := request.blocks[s.topic][s.partition]
-			s.updateCommitted(block.offset, block.metadata)
-		case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
-			ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
-			// not a critical error, we just need to redispatch
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-		case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
-			// nothing we can do about this, just tell the user and carry on
-			s.handleError(err)
-		case ErrOffsetsLoadInProgress:
-			// nothing wrong but we didn't commit, we'll get it next time round
-			break
-		case ErrUnknownTopicOrPartition:
-			// let the user know *and* try redispatching - if topic-auto-create is
-			// enabled, redispatching should trigger a metadata request and create the
-			// topic; if not then re-dispatching won't help, but we've let the user
-			// know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
-			fallthrough
-		default:
-			// dunno, tell the user and try redispatching
-			s.handleError(err)
-			delete(bom.subscriptions, s)
-			s.rebalance <- none{}
-		}
+func (pom *partitionOffsetManager) handleError(err error) {
+	cErr := &ConsumerError{
+		Topic:     pom.topic,
+		Partition: pom.partition,
+		Err:       err,
 	}
-}
 
-func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
-	var r *OffsetCommitRequest
-	var perPartitionTimestamp int64
-	if bom.parent.conf.Consumer.Offsets.Retention == 0 {
-		perPartitionTimestamp = ReceiveTime
-		r = &OffsetCommitRequest{
-			Version:                 1,
-			ConsumerGroup:           bom.parent.group,
-			ConsumerGroupGeneration: GroupGenerationUndefined,
-		}
+	if pom.parent.conf.Consumer.Return.Errors {
+		pom.errors <- cErr
 	} else {
-		r = &OffsetCommitRequest{
-			Version:                 2,
-			RetentionTime:           int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
-			ConsumerGroup:           bom.parent.group,
-			ConsumerGroupGeneration: GroupGenerationUndefined,
-		}
-
-	}
-
-	for s := range bom.subscriptions {
-		s.lock.Lock()
-		if s.dirty {
-			r.AddBlock(s.topic, s.partition, s.offset, perPartitionTimestamp, s.metadata)
-		}
-		s.lock.Unlock()
-	}
-
-	if len(r.blocks) > 0 {
-		return r
+		Logger.Println(cErr)
 	}
-
-	return nil
 }
 
-func (bom *brokerOffsetManager) abort(err error) {
-	_ = bom.broker.Close() // we don't care about the error this might return, we already have one
-	bom.parent.abandonBroker(bom)
-
-	for pom := range bom.subscriptions {
-		pom.handleError(err)
-		pom.rebalance <- none{}
-	}
-
-	for s := range bom.updateSubscriptions {
-		if _, ok := bom.subscriptions[s]; !ok {
-			s.handleError(err)
-			s.rebalance <- none{}
-		}
-	}
-
-	bom.subscriptions = make(map[*partitionOffsetManager]none)
+func (pom *partitionOffsetManager) release() {
+	pom.releaseOnce.Do(func() {
+		go close(pom.errors)
+	})
 }

+ 5 - 16
offset_manager_test.go

@@ -64,25 +64,24 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager,
 func TestNewOffsetManager(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	seedBroker.Returns(new(MetadataResponse))
+	defer seedBroker.Close()
 
 	testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	_, err = NewOffsetManagerFromClient("group", testClient)
+	om, err := NewOffsetManagerFromClient("group", testClient)
 	if err != nil {
 		t.Error(err)
 	}
-
+	safeClose(t, om)
 	safeClose(t, testClient)
 
 	_, err = NewOffsetManagerFromClient("group", testClient)
 	if err != ErrClosedClient {
 		t.Errorf("Error expected for closed client; actual value: %v", err)
 	}
-
-	seedBroker.Close()
 }
 
 // Test recovery from ErrNotCoordinatorForConsumer
@@ -353,24 +352,14 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 	ocResponse2 := new(OffsetCommitResponse)
 	newCoordinator.Returns(ocResponse2)
 
-	// For RefreshCoordinator()
-	broker.Returns(&ConsumerMetadataResponse{
-		CoordinatorID:   newCoordinator.BrokerID(),
-		CoordinatorHost: "127.0.0.1",
-		CoordinatorPort: newCoordinator.Port(),
-	})
+	// No error, no need to refresh coordinator
 
 	// Error on the wrong partition for this pom
 	ocResponse3 := new(OffsetCommitResponse)
 	ocResponse3.AddError("my_topic", 1, ErrNoError)
 	newCoordinator.Returns(ocResponse3)
 
-	// For RefreshCoordinator()
-	broker.Returns(&ConsumerMetadataResponse{
-		CoordinatorID:   newCoordinator.BrokerID(),
-		CoordinatorHost: "127.0.0.1",
-		CoordinatorPort: newCoordinator.Port(),
-	})
+	// No error, no need to refresh coordinator
 
 	// ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
 	ocResponse4 := new(OffsetCommitResponse)