Browse Source

Better documentation and examples

Dimitrij Denissenko 7 years ago
parent
commit
ed8fd54181
2 changed files with 46 additions and 30 deletions
  1. 31 16
      consumer_group.go
  2. 15 14
      consumer_group_test.go

+ 31 - 16
consumer_group.go

@@ -16,16 +16,28 @@ var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group th
 // over a collection of processes (the members of the consumer group).
 type ConsumerGroup interface {
 	// Consume joins a cluster of consumers for a given list of topics and
-	// starts a blocking consumer session through the ConsumerGroupHandler.
+	// starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
 	//
-	// The session will exit and all its handlers stopped either when:
-	// 1. the context is cancelled by the user
-	// 2. first handler exits
-	// 3. a rebalance cycle is initated server-side
+	// The life-cycle of a session is represented by the following steps:
 	//
-	// Please note that the handler will be applied to each of the claimed partitions
-	// in separate goroutines and must therefore be thread-safe. You can only run a single
-	// session at a time and must close the previous session before initiating a new one.
+	// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
+	//    and is assigned their "fair share" of partitions, aka 'claims'.
+	// 2. Before processing starts, the handler's Setup() hook is called to notify the user
+	//    of the claims and allow any necessary preparation or alteration of state.
+	// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
+	//    in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
+	//    from concurrent reads/writes.
+	// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
+	//    parent context is cancelled or when a server-side rebalance cycle is initiated.
+	// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
+	//    to allow the user to perform any final tasks before a rebalance.
+	// 6. Finally, marked offsets are committed one last time before claims are released.
+	//
+	// Please note, that once a relance is triggered, sessions must be completed within
+	// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
+	// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
+	// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
+	// commit failures.
 	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
 
 	// Errors returns a read channel of errors that occurred during the consumer life-cycle.
@@ -657,20 +669,23 @@ func (s *consumerGroupSession) heartbeatLoop() {
 
 // --------------------------------------------------------------------
 
-// ConsumerGroupHandler instances are able to handle ConsumerGroupClaim instances.
-// PLEASE NOTE that handlers are likely be called from several goroutines concurrently.
-// Ensure that all state is safely protected against race conditions.
+// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
+// It also provides hooks for your consumer group session life-cycle and allow you to
+// trigger logic before or after the consume loop(s).
+//
+// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
+// ensure that all state is safely protected against race conditions.
 type ConsumerGroupHandler interface {
-	// Setup is run at the beginning of a new session, before Consume.
+	// Setup is run at the beginning of a new session, before ConsumeClaim.
 	Setup(ConsumerGroupSession) error
 
-	// Cleanup is run at the end of a session, before claims are released.
+	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites
+	// but before the offsets are committed for the very last time.
 	Cleanup(ConsumerGroupSession) error
 
 	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
-	// Once the Messages() channel is closed, the Handler must finish its
-	// processing loop and exit within Config.Consumer.Group.Rebalance.Timeout
-	// as the topic/partition may be re-assigned to another group member.
+	// Once the Messages() channel is closed, the Handler must finish its processing
+	// loop and exit.
 	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
 }
 

+ 15 - 14
consumer_group_test.go

@@ -5,16 +5,20 @@ import (
 	"fmt"
 )
 
-type exampleConsumerGroupHandler func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error
-
-func (h exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
-func (h exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
-func (h exampleConsumerGroupHandler) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error {
-	return h(s, c)
+type exampleConsumerGroupHandler struct{}
+
+func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
+func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
+func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
+	for msg := range claim.Messages() {
+		fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
+		sess.MarkMessage(msg, "")
+	}
+	return nil
 }
 
 func ExampleConsumerGroup() {
-	// Init config, specify version
+	// Init config, specify appropriate version
 	config := NewConfig()
 	config.Version = V1_0_0_0
 	config.Consumer.Return.Errors = true
@@ -43,13 +47,10 @@ func ExampleConsumerGroup() {
 	// Iterate over consumer sessions.
 	ctx := context.Background()
 	for {
-		err := group.Consume(ctx, []string{"my-topic"}, exampleConsumerGroupHandler(func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
-			for msg := range claim.Messages() {
-				fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
-				sess.MarkMessage(msg, "")
-			}
-			return nil
-		}))
+		topics := []string{"my-topic"}
+		handler := exampleConsumerGroupHandler{}
+
+		err := group.Consume(ctx, topics, handler)
 		if err != nil {
 			panic(err)
 		}