Browse Source

First draft implementation of an offset manager

Still a bunch of open questions, but this is reasonably functional at least.
Evan Huus 10 years ago
parent
commit
e0c0da8880
2 changed files with 429 additions and 0 deletions
  1. 10 0
      config.go
  2. 419 0
      offset_manager.go

+ 10 - 0
config.go

@@ -126,6 +126,13 @@ type Config struct {
 			// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
 			Errors bool
 		}
+
+		// Offsets specifies configuration for how and when to commit consumed offsets. This currently requires the
+		// manual use of an OffsetManager but will eventually be automated.
+		Offsets struct {
+			// How frequently to commit updated offsets. Defaults to 10s.
+			CommitInterval time.Duration
+		}
 	}
 
 	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
@@ -164,6 +171,7 @@ func NewConfig() *Config {
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
+	c.Consumer.Offsets.CommitInterval = 10 * time.Second
 
 	c.ChannelBufferSize = 256
 
@@ -263,6 +271,8 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
+	case c.Consumer.Offsets.CommitInterval <= 0:
+		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
 	}
 
 	// validate misc shared values

+ 419 - 0
offset_manager.go

@@ -0,0 +1,419 @@
+package sarama
+
+import (
+	"sync"
+	"time"
+)
+
+// Offset Manager
+
+// OffsetManager uses Kafka to store and fetch consumed partition offsets.
+type OffsetManager interface {
+	// ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
+	// return an error if this OffsetManager is already managing the given topic/partition.
+	ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
+}
+
+type offsetManager struct {
+	client Client
+	conf   *Config
+	group  string
+
+	lock sync.Mutex
+	poms map[string]map[int32]*partitionOffsetManager
+	boms map[*Broker]*brokerOffsetManager
+}
+
+// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
+// It is still necessary to call Close() on the underlying client when finished with the partition manager.
+func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	om := &offsetManager{
+		client: client,
+		conf:   client.Config(),
+		group:  group,
+		poms:   make(map[string]map[int32]*partitionOffsetManager),
+		boms:   make(map[*Broker]*brokerOffsetManager),
+	}
+
+	return om, nil
+}
+
+func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
+	pom, err := om.newPartitionOffsetManager(topic, partition)
+	if err != nil {
+		return nil, err
+	}
+
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	topicManagers := om.poms[topic]
+	if topicManagers == nil {
+		topicManagers = make(map[int32]*partitionOffsetManager)
+		om.poms[topic] = topicManagers
+	}
+
+	if topicManagers[partition] != nil {
+		return nil, ConfigurationError("That topic/partition is already being managed")
+	}
+
+	topicManagers[partition] = pom
+	return pom, nil
+}
+
+func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	bom := om.boms[broker]
+	if bom == nil {
+		bom = om.newBrokerOffsetManager(broker)
+		om.boms[broker] = bom
+	}
+
+	bom.refs++
+
+	return bom
+}
+
+func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	bom.refs--
+
+	if bom.refs == 0 {
+		close(bom.updateSubscriptions)
+		if om.boms[bom.broker] == bom {
+			delete(om.boms, bom.broker)
+		}
+	}
+}
+
+func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	delete(om.boms, bom.broker)
+}
+
+// Partition Offset Manager
+
+// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
+// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
+// out of scope.
+type PartitionOffsetManager interface {
+	// Offset returns the current offset and metadata according to the manager; this value has not necessarily
+	// been flushed to the cluster yet.
+	Offset() (int64, string)
+
+	// SetOffset sets the current offset and metadata according to the manager; this value (or a subsequent update)
+	// will eventually be flushed to the cluster based on configuration.
+	SetOffset(offset int64, metadata string)
+
+	// Errors returns a read channel of errors that occur during offset management, if enabled. By default,
+	// errors are logged and not returned over this channel. If you want to implement any custom error
+	// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
+	Errors() <-chan *ConsumerError
+	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
+	// after which you should wait until the 'errors' channel has been drained and closed.
+	// It is required to call this function, or Close before a consumer object passes out of scope,
+	// as it will otherwise leak memory.  You must call this before calling Close on the underlying
+	// client.
+	AsyncClose()
+	// Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
+	// (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
+	// leak memory. You must call this before calling Close on the underlying client.
+	Close() error
+}
+
+type partitionOffsetManager struct {
+	parent    *offsetManager
+	topic     string
+	partition int32
+
+	lock     sync.Mutex
+	offset   int64
+	metadata string
+	broker   *brokerOffsetManager
+
+	errors    chan *ConsumerError
+	rebalance chan none
+	dying     chan none
+}
+
+func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
+	pom := &partitionOffsetManager{
+		parent:    om,
+		topic:     topic,
+		partition: partition,
+		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
+		rebalance: make(chan none, 1),
+		dying:     make(chan none),
+	}
+
+	if err := pom.selectBroker(); err != nil {
+		return nil, err
+	}
+
+	if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
+		return nil, err
+	}
+
+	go withRecover(pom.mainLoop)
+
+	return pom, nil
+}
+
+func (pom *partitionOffsetManager) mainLoop() {
+	for {
+		select {
+		case <-pom.rebalance:
+			if err := pom.selectBroker(); err != nil {
+				pom.handleError(err)
+				pom.rebalance <- none{}
+			}
+		case <-pom.dying:
+			if pom.broker != nil {
+				select {
+				case <-pom.rebalance:
+				case pom.broker.updateSubscriptions <- pom:
+				}
+				pom.parent.unrefBrokerOffsetManager(pom.broker)
+			}
+			close(pom.errors)
+			return
+		}
+	}
+}
+
+func (pom *partitionOffsetManager) selectBroker() error {
+	if pom.broker != nil {
+		pom.parent.unrefBrokerOffsetManager(pom.broker)
+		pom.broker = nil
+	}
+
+	var broker *Broker
+	var err error
+
+	if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
+		return err
+	}
+
+	if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
+		return err
+	}
+
+	pom.broker = pom.parent.refBrokerOffsetManager(broker)
+	pom.broker.updateSubscriptions <- pom
+	return nil
+}
+
+func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
+	request := new(OffsetFetchRequest)
+	request.Version = 1
+	request.ConsumerGroup = pom.parent.group
+	request.AddPartition(pom.topic, pom.partition)
+
+	response, err := pom.broker.broker.FetchOffset(request)
+	if err != nil {
+		return err
+	}
+
+	block := response.GetBlock(pom.topic, pom.partition)
+	if block == nil {
+		return ErrIncompleteResponse
+	}
+
+	switch block.Err {
+	case ErrNoError:
+		pom.offset = block.Offset
+		pom.metadata = block.Metadata
+		return nil
+	case ErrNotCoordinatorForConsumer:
+		if retries <= 0 {
+			return err
+		}
+		if err := pom.selectBroker(); err != nil {
+			return err
+		}
+		return pom.fetchInitialOffset(retries - 1)
+	case ErrOffsetsLoadInProgress:
+		if retries <= 0 {
+			return err
+		}
+		time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
+		return pom.fetchInitialOffset(retries - 1)
+	default:
+		return err
+	}
+}
+
+func (pom *partitionOffsetManager) handleError(err error) {
+	cErr := &ConsumerError{
+		Topic:     pom.topic,
+		Partition: pom.partition,
+		Err:       err,
+	}
+
+	if pom.parent.conf.Consumer.Return.Errors {
+		pom.errors <- cErr
+	} else {
+		Logger.Println(cErr)
+	}
+}
+
+func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
+	return pom.errors
+}
+
+func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
+	pom.lock.Lock()
+	defer pom.lock.Unlock()
+
+	pom.offset = offset
+	pom.metadata = metadata
+}
+
+func (pom *partitionOffsetManager) Offset() (int64, string) {
+	pom.lock.Lock()
+	defer pom.lock.Unlock()
+
+	return pom.offset, pom.metadata
+}
+
+func (pom *partitionOffsetManager) AsyncClose() {
+	close(pom.dying)
+}
+
+func (pom *partitionOffsetManager) Close() error {
+	pom.AsyncClose()
+
+	var errors ConsumerErrors
+	for err := range pom.errors {
+		errors = append(errors, err)
+	}
+
+	if len(errors) > 0 {
+		return errors
+	}
+	return nil
+}
+
+// Broker Offset Manager
+
+type brokerOffsetManager struct {
+	parent              *offsetManager
+	broker              *Broker
+	timer               *time.Ticker
+	updateSubscriptions chan *partitionOffsetManager
+	subscriptions       map[*partitionOffsetManager]none
+	refs                int
+}
+
+func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
+	bom := &brokerOffsetManager{
+		parent:              om,
+		broker:              broker,
+		timer:               time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
+		updateSubscriptions: make(chan *partitionOffsetManager),
+		subscriptions:       make(map[*partitionOffsetManager]none),
+	}
+
+	go withRecover(bom.mainLoop)
+
+	return bom
+}
+
+func (bom *brokerOffsetManager) mainLoop() {
+	for {
+		select {
+		case <-bom.timer.C:
+			bom.flushToBroker()
+		case s, ok := <-bom.updateSubscriptions:
+			if !ok {
+				bom.timer.Stop()
+				return
+			}
+			if _, ok := bom.subscriptions[s]; ok {
+				delete(bom.subscriptions, s)
+			} else {
+				bom.subscriptions[s] = none{}
+			}
+		}
+	}
+}
+
+func (bom *brokerOffsetManager) flushToBroker() {
+	request := bom.constructRequest()
+	response, err := bom.broker.CommitOffset(request)
+
+	if err != nil {
+		bom.abort(err)
+	}
+
+	for s := range bom.subscriptions {
+		var err KError
+		var ok bool
+
+		if response.Errors[s.topic] == nil {
+			s.handleError(ErrIncompleteResponse)
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+			continue
+		}
+		if err, ok = response.Errors[s.topic][s.partition]; !ok {
+			s.handleError(ErrIncompleteResponse)
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+			continue
+		}
+
+		switch err {
+		case ErrNoError:
+			break
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+		default:
+			s.handleError(err)
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+		}
+	}
+}
+
+func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
+	r := &OffsetCommitRequest{
+		Version:       1,
+		ConsumerGroup: bom.parent.group,
+	}
+	for s := range bom.subscriptions {
+		s.lock.Lock()
+		r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
+		s.lock.Unlock()
+	}
+	return r
+}
+
+func (bom *brokerOffsetManager) abort(err error) {
+	_ = bom.broker.Close() // we don't care about the error this might return, we already have one
+	bom.parent.abandonBroker(bom)
+
+	for pom := range bom.subscriptions {
+		pom.handleError(err)
+		pom.rebalance <- none{}
+	}
+
+	for s := range bom.updateSubscriptions {
+		if _, ok := bom.subscriptions[s]; !ok {
+			s.handleError(err)
+			s.rebalance <- none{}
+		}
+	}
+}