Bläddra i källkod

Add support for manual commit to ConsumerGroup

- expose a `Commit()` sync method on ConsumerGroupSession
- don't create mainLoop in OffsetManager unless AutoCommit is enabled
Wim Claeys 3 år sedan
förälder
incheckning
cb9d1e8b85
3 ändrade filer med 84 tillägg och 10 borttagningar
  1. 9 0
      consumer_group.go
  2. 17 10
      offset_manager.go
  3. 58 0
      offset_manager_test.go

+ 9 - 0
consumer_group.go

@@ -513,6 +513,11 @@ type ConsumerGroupSession interface {
 	// message twice, and your processing should ideally be idempotent.
 	MarkOffset(topic string, partition int32, offset int64, metadata string)
 
+	// Commit the offset to the backend
+	//
+	// Note: calling Commit performs a blocking synchronous operation.
+	Commit()
+
 	// ResetOffset resets to the provided offset, alongside a metadata string that
 	// represents the state of the partition consumer at that point in time. Reset
 	// acts as a counterpart to MarkOffset, the difference being that it allows to
@@ -624,6 +629,10 @@ func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset
 	}
 }
 
+func (s *consumerGroupSession) Commit() {
+	s.offsets.Commit()
+}
+
 func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
 	if pom := s.offsets.findPOM(topic, partition); pom != nil {
 		pom.ResetOffset(offset, metadata)

+ 17 - 10
offset_manager.go

@@ -19,6 +19,10 @@ type OffsetManager interface {
 	// will otherwise leak memory. You must call this after all the
 	// PartitionOffsetManagers are closed.
 	Close() error
+
+	// Commit commits the offsets. This method can be used if AutoCommit.Enable is
+	// set to false.
+	Commit()
 }
 
 type offsetManager struct {
@@ -58,7 +62,6 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
 		client: client,
 		conf:   conf,
 		group:  group,
-		ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
 		poms:   make(map[string]map[int32]*partitionOffsetManager),
 
 		memberID:   memberID,
@@ -67,7 +70,10 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
 		closing: make(chan none),
 		closed:  make(chan none),
 	}
-	go withRecover(om.mainLoop)
+	if conf.Consumer.Offsets.AutoCommit.Enable {
+		om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
+		go withRecover(om.mainLoop)
+	}
 
 	return om, nil
 }
@@ -99,7 +105,9 @@ func (om *offsetManager) Close() error {
 	om.closeOnce.Do(func() {
 		// exit the mainLoop
 		close(om.closing)
-		<-om.closed
+		if om.conf.Consumer.Offsets.AutoCommit.Enable {
+			<-om.closed
+		}
 
 		// mark all POMs as closed
 		om.asyncClosePOMs()
@@ -225,20 +233,19 @@ func (om *offsetManager) mainLoop() {
 	for {
 		select {
 		case <-om.ticker.C:
-			om.flushToBroker()
-			om.releasePOMs(false)
+			om.Commit()
 		case <-om.closing:
 			return
 		}
 	}
 }
 
-// flushToBroker is ignored if auto-commit offsets is disabled
-func (om *offsetManager) flushToBroker() {
-	if !om.conf.Consumer.Offsets.AutoCommit.Enable {
-		return
-	}
+func (om *offsetManager) Commit() {
+	om.flushToBroker()
+	om.releasePOMs(false)
+}
 
+func (om *offsetManager) flushToBroker() {
 	req := om.constructRequest()
 	if req == nil {
 		return

+ 58 - 0
offset_manager_test.go

@@ -169,6 +169,64 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
 	}
 }
 
+func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
+	// Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false
+	config := NewConfig()
+	config.Consumer.Offsets.AutoCommit.Enable = false
+
+	om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
+
+	// Wait long enough for the test not to fail..
+	timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
+
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrNoError)
+	called := make(chan none)
+	handler := func(req *request) (res encoderWithHeader) {
+		close(called)
+		return ocResponse
+	}
+	coordinator.setHandler(handler)
+
+	// Should not trigger an auto-commit
+	expected := int64(1)
+	pom.ResetOffset(expected, "modified_meta")
+	_, _ = pom.NextOffset()
+
+	select {
+	case <-called:
+		// OffsetManager called on the wire.
+		t.Errorf("Received request when AutoCommit is disabled")
+	case <-time.After(timeout):
+		// Timeout waiting for OffsetManager to call on the wire.
+		// OK
+	}
+
+	// Setup again to test manual commit
+	called = make(chan none)
+
+	om.Commit()
+
+	select {
+	case <-called:
+		// OffsetManager called on the wire.
+		// OK
+	case <-time.After(timeout):
+		// Timeout waiting for OffsetManager to call on the wire.
+		t.Errorf("No request received for after waiting for %v", timeout)
+	}
+
+	// Close up
+	broker.Close()
+	coordinator.Close()
+
+	// !! om must be closed before the pom so pom.release() is called before pom.Close()
+	safeClose(t, om)
+	safeClose(t, pom)
+	safeClose(t, testClient)
+}
+
 // Test recovery from ErrNotCoordinatorForConsumer
 // on first fetchInitialOffset call
 func TestOffsetManagerFetchInitialFail(t *testing.T) {