Prechádzať zdrojové kódy

Merge pull request #461 from Shopify/offset-manager

OffsetManager Implementation
Evan Huus 9 rokov pred
rodič
commit
23a7cd996a
5 zmenil súbory, kde vykonal 907 pridanie a 1 odobranie
  1. 18 0
      config.go
  2. 51 0
      functional_offset_manager_test.go
  3. 1 1
      functional_test.go
  4. 506 0
      offset_manager.go
  5. 331 0
      offset_manager_test.go

+ 18 - 0
config.go

@@ -126,6 +126,17 @@ type Config struct {
 			// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
 			Errors bool
 		}
+
+		// Offsets specifies configuration for how and when to commit consumed offsets. This currently requires the
+		// manual use of an OffsetManager but will eventually be automated.
+		Offsets struct {
+			// How frequently to commit updated offsets. Defaults to 10s.
+			CommitInterval time.Duration
+
+			// The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest.
+			// Defaults to OffsetNewest.
+			Initial int64
+		}
 	}
 
 	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
@@ -164,6 +175,8 @@ func NewConfig() *Config {
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
+	c.Consumer.Offsets.CommitInterval = 10 * time.Second
+	c.Consumer.Offsets.Initial = OffsetNewest
 
 	c.ChannelBufferSize = 256
 
@@ -263,6 +276,11 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
+	case c.Consumer.Offsets.CommitInterval <= 0:
+		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
+	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
+		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
+
 	}
 
 	// validate misc shared values

+ 51 - 0
functional_offset_manager_test.go

@@ -0,0 +1,51 @@
+package sarama
+
+import (
+	"testing"
+)
+
+func TestFuncOffsetManager(t *testing.T) {
+	checkKafkaVersion(t, "0.8.2")
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	client, err := NewClient(kafkaBrokers, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
+		t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
+	}
+
+	pom1, err := offsetManager.ManagePartition("test.1", 0)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pom1.MarkOffset(10, "test metadata")
+	safeClose(t, pom1)
+
+	pom2, err := offsetManager.ManagePartition("test.1", 0)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	offset, metadata := pom2.NextOffset()
+
+	if offset != 10+1 {
+		t.Errorf("Expected the next offset to be 11, found %d.", offset)
+	}
+	if metadata != "test metadata" {
+		t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
+	}
+
+	safeClose(t, pom2)
+	safeClose(t, offsetManager)
+	safeClose(t, client)
+}

+ 1 - 1
functional_test.go

@@ -75,7 +75,7 @@ func checkKafkaAvailability(t testing.TB) {
 func checkKafkaVersion(t testing.TB, requiredVersion string) {
 	kafkaVersion := os.Getenv("KAFKA_VERSION")
 	if kafkaVersion == "" {
-		t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
+		t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
 	} else {
 		available := parseKafkaVersion(kafkaVersion)
 		required := parseKafkaVersion(requiredVersion)

+ 506 - 0
offset_manager.go

@@ -0,0 +1,506 @@
+package sarama
+
+import (
+	"sync"
+	"time"
+)
+
+// Offset Manager
+
+// OffsetManager uses Kafka to store and fetch consumed partition offsets.
+type OffsetManager interface {
+	// ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
+	// return an error if this OffsetManager is already managing the given topic/partition.
+	ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
+
+	// Close stops the OffsetManager from managing offsets. It is required to call this function
+	// before an OffsetManager object passes out of scope, as it will otherwise
+	// leak memory. You must call this after all the PartitionOffsetManagers are closed.
+	Close() error
+}
+
+type offsetManager struct {
+	client Client
+	conf   *Config
+	group  string
+
+	lock sync.Mutex
+	poms map[string]map[int32]*partitionOffsetManager
+	boms map[*Broker]*brokerOffsetManager
+}
+
+// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
+// It is still necessary to call Close() on the underlying client when finished with the partition manager.
+func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	om := &offsetManager{
+		client: client,
+		conf:   client.Config(),
+		group:  group,
+		poms:   make(map[string]map[int32]*partitionOffsetManager),
+		boms:   make(map[*Broker]*brokerOffsetManager),
+	}
+
+	return om, nil
+}
+
+func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
+	pom, err := om.newPartitionOffsetManager(topic, partition)
+	if err != nil {
+		return nil, err
+	}
+
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	topicManagers := om.poms[topic]
+	if topicManagers == nil {
+		topicManagers = make(map[int32]*partitionOffsetManager)
+		om.poms[topic] = topicManagers
+	}
+
+	if topicManagers[partition] != nil {
+		return nil, ConfigurationError("That topic/partition is already being managed")
+	}
+
+	topicManagers[partition] = pom
+	return pom, nil
+}
+
+func (om *offsetManager) Close() error {
+	return nil
+}
+
+func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	bom := om.boms[broker]
+	if bom == nil {
+		bom = om.newBrokerOffsetManager(broker)
+		om.boms[broker] = bom
+	}
+
+	bom.refs++
+
+	return bom
+}
+
+func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	bom.refs--
+
+	if bom.refs == 0 {
+		close(bom.updateSubscriptions)
+		if om.boms[bom.broker] == bom {
+			delete(om.boms, bom.broker)
+		}
+	}
+}
+
+func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	delete(om.boms, bom.broker)
+}
+
+func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	delete(om.poms[pom.topic], pom.partition)
+	if len(om.poms[pom.topic]) == 0 {
+		delete(om.poms, pom.topic)
+	}
+}
+
+// Partition Offset Manager
+
+// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
+// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
+// out of scope.
+type PartitionOffsetManager interface {
+	// NextOffset returns the next offset that should be consumed for the managed partition, accompanied
+	// by metadata which can be used to reconstruct the state of the partition consumer when it resumes.
+	// NextOffset() will return `config.Consumer.Offsets.Initial` and an empty metadata string if no
+	// offset was committed for this partition yet.
+	NextOffset() (int64, string)
+
+	// MarkOffset marks the provided offset as processed, alongside a metadata string that represents
+	// the state of the partition consumer at that point in time. The metadata string can be used by
+	// another consumer to restore that state, so it can resume consumption.
+	//
+	// Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately
+	// for efficiency reasons, and it may never be committed if your application crashes. This means that
+	// you may end up processing the same message twice, and your processing should ideally be idempotent.
+	MarkOffset(offset int64, metadata string)
+
+	// Errors returns a read channel of errors that occur during offset management, if enabled. By default,
+	// errors are logged and not returned over this channel. If you want to implement any custom error
+	// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
+	Errors() <-chan *ConsumerError
+
+	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
+	// after which you should wait until the 'errors' channel has been drained and closed.
+	// It is required to call this function, or Close before a consumer object passes out of scope,
+	// as it will otherwise leak memory.  You must call this before calling Close on the underlying
+	// client.
+	AsyncClose()
+
+	// Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
+	// (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
+	// leak memory. You must call this before calling Close on the underlying client.
+	Close() error
+}
+
+type partitionOffsetManager struct {
+	parent    *offsetManager
+	topic     string
+	partition int32
+
+	lock     sync.Mutex
+	offset   int64
+	metadata string
+	dirty    bool
+	clean    chan none
+	broker   *brokerOffsetManager
+
+	errors    chan *ConsumerError
+	rebalance chan none
+	dying     chan none
+}
+
+func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
+	pom := &partitionOffsetManager{
+		parent:    om,
+		topic:     topic,
+		partition: partition,
+		clean:     make(chan none),
+		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
+		rebalance: make(chan none, 1),
+		dying:     make(chan none),
+	}
+
+	if err := pom.selectBroker(); err != nil {
+		return nil, err
+	}
+
+	if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
+		return nil, err
+	}
+
+	pom.broker.updateSubscriptions <- pom
+
+	go withRecover(pom.mainLoop)
+
+	return pom, nil
+}
+
+func (pom *partitionOffsetManager) mainLoop() {
+	for {
+		select {
+		case <-pom.rebalance:
+			if err := pom.selectBroker(); err != nil {
+				pom.handleError(err)
+				pom.rebalance <- none{}
+			} else {
+				pom.broker.updateSubscriptions <- pom
+			}
+		case <-pom.dying:
+			if pom.broker != nil {
+				select {
+				case <-pom.rebalance:
+				case pom.broker.updateSubscriptions <- pom:
+				}
+				pom.parent.unrefBrokerOffsetManager(pom.broker)
+			}
+			pom.parent.abandonPartitionOffsetManager(pom)
+			close(pom.errors)
+			return
+		}
+	}
+}
+
+func (pom *partitionOffsetManager) selectBroker() error {
+	if pom.broker != nil {
+		pom.parent.unrefBrokerOffsetManager(pom.broker)
+		pom.broker = nil
+	}
+
+	var broker *Broker
+	var err error
+
+	if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
+		return err
+	}
+
+	if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
+		return err
+	}
+
+	pom.broker = pom.parent.refBrokerOffsetManager(broker)
+	return nil
+}
+
+func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
+	request := new(OffsetFetchRequest)
+	request.Version = 1
+	request.ConsumerGroup = pom.parent.group
+	request.AddPartition(pom.topic, pom.partition)
+
+	response, err := pom.broker.broker.FetchOffset(request)
+	if err != nil {
+		return err
+	}
+
+	block := response.GetBlock(pom.topic, pom.partition)
+	if block == nil {
+		return ErrIncompleteResponse
+	}
+
+	switch block.Err {
+	case ErrNoError:
+		pom.offset = block.Offset
+		pom.metadata = block.Metadata
+		return nil
+	case ErrNotCoordinatorForConsumer:
+		if retries <= 0 {
+			return block.Err
+		}
+		if err := pom.selectBroker(); err != nil {
+			return err
+		}
+		return pom.fetchInitialOffset(retries - 1)
+	case ErrOffsetsLoadInProgress:
+		if retries <= 0 {
+			return block.Err
+		}
+		time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
+		return pom.fetchInitialOffset(retries - 1)
+	default:
+		return block.Err
+	}
+}
+
+func (pom *partitionOffsetManager) handleError(err error) {
+	cErr := &ConsumerError{
+		Topic:     pom.topic,
+		Partition: pom.partition,
+		Err:       err,
+	}
+
+	if pom.parent.conf.Consumer.Return.Errors {
+		pom.errors <- cErr
+	} else {
+		Logger.Println(cErr)
+	}
+}
+
+func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
+	return pom.errors
+}
+
+func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
+	pom.lock.Lock()
+	defer pom.lock.Unlock()
+
+	pom.offset = offset
+	pom.metadata = metadata
+	pom.dirty = true
+}
+
+func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
+	pom.lock.Lock()
+	defer pom.lock.Unlock()
+
+	if pom.offset == offset && pom.metadata == metadata {
+		pom.dirty = false
+
+		select {
+		case pom.clean <- none{}:
+		default:
+		}
+	}
+}
+
+func (pom *partitionOffsetManager) NextOffset() (int64, string) {
+	pom.lock.Lock()
+	defer pom.lock.Unlock()
+
+	if pom.offset >= 0 {
+		return pom.offset + 1, pom.metadata
+	} else {
+		return pom.parent.conf.Consumer.Offsets.Initial, ""
+	}
+}
+
+func (pom *partitionOffsetManager) AsyncClose() {
+	go func() {
+		pom.lock.Lock()
+		dirty := pom.dirty
+		pom.lock.Unlock()
+
+		if dirty {
+			<-pom.clean
+		}
+
+		close(pom.dying)
+	}()
+}
+
+func (pom *partitionOffsetManager) Close() error {
+	pom.AsyncClose()
+
+	var errors ConsumerErrors
+	for err := range pom.errors {
+		errors = append(errors, err)
+	}
+
+	if len(errors) > 0 {
+		return errors
+	}
+	return nil
+}
+
+// Broker Offset Manager
+
+type brokerOffsetManager struct {
+	parent              *offsetManager
+	broker              *Broker
+	timer               *time.Ticker
+	updateSubscriptions chan *partitionOffsetManager
+	subscriptions       map[*partitionOffsetManager]none
+	refs                int
+}
+
+func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
+	bom := &brokerOffsetManager{
+		parent:              om,
+		broker:              broker,
+		timer:               time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
+		updateSubscriptions: make(chan *partitionOffsetManager),
+		subscriptions:       make(map[*partitionOffsetManager]none),
+	}
+
+	go withRecover(bom.mainLoop)
+
+	return bom
+}
+
+func (bom *brokerOffsetManager) mainLoop() {
+	for {
+		select {
+		case <-bom.timer.C:
+			if len(bom.subscriptions) > 0 {
+				bom.flushToBroker()
+			}
+		case s, ok := <-bom.updateSubscriptions:
+			if !ok {
+				bom.timer.Stop()
+				return
+			}
+			if _, ok := bom.subscriptions[s]; ok {
+				delete(bom.subscriptions, s)
+			} else {
+				bom.subscriptions[s] = none{}
+			}
+		}
+	}
+}
+
+func (bom *brokerOffsetManager) flushToBroker() {
+	request := bom.constructRequest()
+	if request == nil {
+		return
+	}
+
+	response, err := bom.broker.CommitOffset(request)
+
+	if err != nil {
+		bom.abort(err)
+		return
+	}
+
+	for s := range bom.subscriptions {
+		if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil {
+			continue
+		}
+
+		var err KError
+		var ok bool
+
+		if response.Errors[s.topic] == nil {
+			s.handleError(ErrIncompleteResponse)
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+			continue
+		}
+		if err, ok = response.Errors[s.topic][s.partition]; !ok {
+			s.handleError(ErrIncompleteResponse)
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+			continue
+		}
+
+		switch err {
+		case ErrNoError:
+			block := request.blocks[s.topic][s.partition]
+			s.updateCommitted(block.offset, block.metadata)
+			break
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+		default:
+			s.handleError(err)
+			delete(bom.subscriptions, s)
+			s.rebalance <- none{}
+		}
+	}
+}
+
+func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
+	r := &OffsetCommitRequest{
+		Version:       1,
+		ConsumerGroup: bom.parent.group,
+	}
+
+	for s := range bom.subscriptions {
+		s.lock.Lock()
+		if s.dirty {
+			r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
+		}
+		s.lock.Unlock()
+	}
+
+	if len(r.blocks) > 0 {
+		return r
+	}
+
+	return nil
+}
+
+func (bom *brokerOffsetManager) abort(err error) {
+	_ = bom.broker.Close() // we don't care about the error this might return, we already have one
+	bom.parent.abandonBroker(bom)
+
+	for pom := range bom.subscriptions {
+		pom.handleError(err)
+		pom.rebalance <- none{}
+	}
+
+	for s := range bom.updateSubscriptions {
+		if _, ok := bom.subscriptions[s]; !ok {
+			s.handleError(err)
+			s.rebalance <- none{}
+		}
+	}
+
+	bom.subscriptions = make(map[*partitionOffsetManager]none)
+}

+ 331 - 0
offset_manager_test.go

@@ -0,0 +1,331 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+func initOffsetManager(t *testing.T) (om OffsetManager,
+	testClient Client, broker, coordinator *mockBroker) {
+
+	config := NewConfig()
+	config.Metadata.Retry.Max = 1
+	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
+
+	broker = newMockBroker(t, 1)
+	coordinator = newMockBroker(t, 2)
+
+	seedMeta := new(MetadataResponse)
+	seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
+	seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
+	seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
+	broker.Returns(seedMeta)
+
+	var err error
+	testClient, err = NewClient([]string{broker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   coordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: coordinator.Port(),
+	})
+
+	om, err = NewOffsetManagerFromClient("group", testClient)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	return om, testClient, broker, coordinator
+}
+
+func initPartitionOffsetManager(t *testing.T, om OffsetManager,
+	coordinator *mockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
+
+	fetchResponse := new(OffsetFetchResponse)
+	fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
+		Err:      ErrNoError,
+		Offset:   initialOffset,
+		Metadata: metadata,
+	})
+	coordinator.Returns(fetchResponse)
+
+	pom, err := om.ManagePartition("my_topic", 0)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	return pom
+}
+
+func TestNewOffsetManager(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	seedBroker.Returns(new(MetadataResponse))
+
+	testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	_, err = NewOffsetManagerFromClient("group", testClient)
+	if err != nil {
+		t.Error(err)
+	}
+
+	safeClose(t, testClient)
+
+	_, err = NewOffsetManagerFromClient("group", testClient)
+	if err != ErrClosedClient {
+		t.Errorf("Error expected for closed client; actual value: %v", err)
+	}
+
+	seedBroker.Close()
+}
+
+// Test recovery from ErrNotCoordinatorForConsumer
+// on first fetchInitialOffset call
+func TestOffsetManagerFetchInitialFail(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+
+	// Error on first fetchInitialOffset call
+	responseBlock := OffsetFetchResponseBlock{
+		Err:      ErrNotCoordinatorForConsumer,
+		Offset:   5,
+		Metadata: "test_meta",
+	}
+
+	fetchResponse := new(OffsetFetchResponse)
+	fetchResponse.AddBlock("my_topic", 0, &responseBlock)
+	coordinator.Returns(fetchResponse)
+
+	// Refresh coordinator
+	newCoordinator := newMockBroker(t, 3)
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   newCoordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: newCoordinator.Port(),
+	})
+
+	// Second fetchInitialOffset call is fine
+	fetchResponse2 := new(OffsetFetchResponse)
+	responseBlock2 := responseBlock
+	responseBlock2.Err = ErrNoError
+	fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
+	newCoordinator.Returns(fetchResponse2)
+
+	pom, err := om.ManagePartition("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	}
+
+	broker.Close()
+	coordinator.Close()
+	newCoordinator.Close()
+	safeClose(t, pom)
+	safeClose(t, om)
+	safeClose(t, testClient)
+}
+
+// Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
+func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+
+	// Error on first fetchInitialOffset call
+	responseBlock := OffsetFetchResponseBlock{
+		Err:      ErrOffsetsLoadInProgress,
+		Offset:   5,
+		Metadata: "test_meta",
+	}
+
+	fetchResponse := new(OffsetFetchResponse)
+	fetchResponse.AddBlock("my_topic", 0, &responseBlock)
+	coordinator.Returns(fetchResponse)
+
+	// Second fetchInitialOffset call is fine
+	fetchResponse2 := new(OffsetFetchResponse)
+	responseBlock2 := responseBlock
+	responseBlock2.Err = ErrNoError
+	fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
+	coordinator.Returns(fetchResponse2)
+
+	pom, err := om.ManagePartition("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	}
+
+	broker.Close()
+	coordinator.Close()
+	safeClose(t, pom)
+	safeClose(t, om)
+	safeClose(t, testClient)
+}
+
+func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	testClient.Config().Consumer.Offsets.Initial = OffsetOldest
+
+	// Kafka returns -1 if no offset has been stored for this partition yet.
+	pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
+
+	offset, meta := pom.NextOffset()
+	if offset != OffsetOldest {
+		t.Errorf("Expected offset 5. Actual: %v", offset)
+	}
+	if meta != "" {
+		t.Errorf("Expected metadata to be empty. Actual: %q", meta)
+	}
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	broker.Close()
+	coordinator.Close()
+	safeClose(t, testClient)
+}
+
+func TestPartitionOffsetManagerNextOffset(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
+
+	offset, meta := pom.NextOffset()
+	if offset != 6 {
+		t.Errorf("Expected offset 5. Actual: %v", offset)
+	}
+	if meta != "test_meta" {
+		t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
+	}
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	broker.Close()
+	coordinator.Close()
+	safeClose(t, testClient)
+}
+
+func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
+
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrNoError)
+	coordinator.Returns(ocResponse)
+
+	pom.MarkOffset(100, "modified_meta")
+	offset, meta := pom.NextOffset()
+
+	if offset != 101 {
+		t.Errorf("Expected offset 100. Actual: %v", offset)
+	}
+	if meta != "modified_meta" {
+		t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
+	}
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	safeClose(t, testClient)
+	broker.Close()
+	coordinator.Close()
+}
+
+func TestPartitionOffsetManagerCommitErr(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
+
+	// Error on one partition
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
+	ocResponse.AddError("my_topic", 1, ErrNoError)
+	coordinator.Returns(ocResponse)
+
+	newCoordinator := newMockBroker(t, 3)
+
+	// For RefreshCoordinator()
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   newCoordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: newCoordinator.Port(),
+	})
+
+	// Nothing in response.Errors at all
+	ocResponse2 := new(OffsetCommitResponse)
+	newCoordinator.Returns(ocResponse2)
+
+	// For RefreshCoordinator()
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   newCoordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: newCoordinator.Port(),
+	})
+
+	// Error on the wrong partition for this pom
+	ocResponse3 := new(OffsetCommitResponse)
+	ocResponse3.AddError("my_topic", 1, ErrNoError)
+	newCoordinator.Returns(ocResponse3)
+
+	// For RefreshCoordinator()
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   newCoordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: newCoordinator.Port(),
+	})
+
+	// ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
+	ocResponse4 := new(OffsetCommitResponse)
+	ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
+	newCoordinator.Returns(ocResponse4)
+
+	// For RefreshCoordinator()
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   newCoordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: newCoordinator.Port(),
+	})
+
+	// Normal error response
+	ocResponse5 := new(OffsetCommitResponse)
+	ocResponse5.AddError("my_topic", 0, ErrNoError)
+	newCoordinator.Returns(ocResponse5)
+
+	pom.MarkOffset(100, "modified_meta")
+
+	err := pom.Close()
+	if err != nil {
+		t.Error(err)
+	}
+
+	broker.Close()
+	coordinator.Close()
+	newCoordinator.Close()
+	safeClose(t, om)
+	safeClose(t, testClient)
+}
+
+// Test of recovery from abort
+func TestAbortPartitionOffsetManager(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
+
+	// this triggers an error in the CommitOffset request,
+	// which leads to the abort call
+	coordinator.Close()
+
+	// Response to refresh coordinator request
+	newCoordinator := newMockBroker(t, 3)
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   newCoordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: newCoordinator.Port(),
+	})
+
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrNoError)
+	newCoordinator.Returns(ocResponse)
+
+	pom.MarkOffset(100, "modified_meta")
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	broker.Close()
+	safeClose(t, testClient)
+}