123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- package sarama
- import (
- "sync/atomic"
- "testing"
- "time"
- )
- func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
- backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
- testClient Client, broker, coordinator *MockBroker) {
- config.Metadata.Retry.Max = 1
- if backoffFunc != nil {
- config.Metadata.Retry.BackoffFunc = backoffFunc
- }
- config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
- config.Version = V0_9_0_0
- if retention > 0 {
- config.Consumer.Offsets.Retention = retention
- }
- broker = NewMockBroker(t, 1)
- coordinator = NewMockBroker(t, 2)
- seedMeta := new(MetadataResponse)
- seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
- seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
- seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
- broker.Returns(seedMeta)
- var err error
- testClient, err = NewClient([]string{broker.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: coordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: coordinator.Port(),
- })
- om, err = NewOffsetManagerFromClient("group", testClient)
- if err != nil {
- t.Fatal(err)
- }
- return om, testClient, broker, coordinator
- }
- func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
- testClient Client, broker, coordinator *MockBroker) {
- return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
- }
- func initPartitionOffsetManager(t *testing.T, om OffsetManager,
- coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
- fetchResponse := new(OffsetFetchResponse)
- fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
- Err: ErrNoError,
- Offset: initialOffset,
- Metadata: metadata,
- })
- coordinator.Returns(fetchResponse)
- pom, err := om.ManagePartition("my_topic", 0)
- if err != nil {
- t.Fatal(err)
- }
- return pom
- }
- func TestNewOffsetManager(t *testing.T) {
- seedBroker := NewMockBroker(t, 1)
- seedBroker.Returns(new(MetadataResponse))
- defer seedBroker.Close()
- testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- om, err := NewOffsetManagerFromClient("group", testClient)
- if err != nil {
- t.Error(err)
- }
- safeClose(t, om)
- safeClose(t, testClient)
- _, err = NewOffsetManagerFromClient("group", testClient)
- if err != ErrClosedClient {
- t.Errorf("Error expected for closed client; actual value: %v", err)
- }
- }
- var offsetsautocommitTestTable = []struct {
- name string
- set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
- enable bool
- }{
- {
- "AutoCommit (default)",
- false, // use default
- true,
- },
- {
- "AutoCommit Enabled",
- true,
- true,
- },
- {
- "AutoCommit Disabled",
- true,
- false,
- },
- }
- func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
- // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
- for _, tt := range offsetsautocommitTestTable {
- t.Run(tt.name, func(t *testing.T) {
- config := NewConfig()
- if tt.set {
- config.Consumer.Offsets.AutoCommit.Enable = tt.enable
- }
- om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
- // Wait long enough for the test not to fail..
- timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
- called := make(chan none)
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- handler := func(req *request) (res encoderWithHeader) {
- close(called)
- return ocResponse
- }
- coordinator.setHandler(handler)
- // Should force an offset commit, if auto-commit is enabled.
- expected := int64(1)
- pom.ResetOffset(expected, "modified_meta")
- _, _ = pom.NextOffset()
- select {
- case <-called:
- // OffsetManager called on the wire.
- if !config.Consumer.Offsets.AutoCommit.Enable {
- t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
- }
- case <-time.After(timeout):
- // Timeout waiting for OffsetManager to call on the wire.
- if config.Consumer.Offsets.AutoCommit.Enable {
- t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
- }
- }
- broker.Close()
- coordinator.Close()
- // !! om must be closed before the pom so pom.release() is called before pom.Close()
- safeClose(t, om)
- safeClose(t, pom)
- safeClose(t, testClient)
- })
- }
- }
- // Test recovery from ErrNotCoordinatorForConsumer
- // on first fetchInitialOffset call
- func TestOffsetManagerFetchInitialFail(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, 0)
- // Error on first fetchInitialOffset call
- responseBlock := OffsetFetchResponseBlock{
- Err: ErrNotCoordinatorForConsumer,
- Offset: 5,
- Metadata: "test_meta",
- }
- fetchResponse := new(OffsetFetchResponse)
- fetchResponse.AddBlock("my_topic", 0, &responseBlock)
- coordinator.Returns(fetchResponse)
- // Refresh coordinator
- newCoordinator := NewMockBroker(t, 3)
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
- // Second fetchInitialOffset call is fine
- fetchResponse2 := new(OffsetFetchResponse)
- responseBlock2 := responseBlock
- responseBlock2.Err = ErrNoError
- fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
- newCoordinator.Returns(fetchResponse2)
- pom, err := om.ManagePartition("my_topic", 0)
- if err != nil {
- t.Error(err)
- }
- broker.Close()
- coordinator.Close()
- newCoordinator.Close()
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- }
- // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
- func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
- retryCount := int32(0)
- backoff := func(retries, maxRetries int) time.Duration {
- atomic.AddInt32(&retryCount, 1)
- return 0
- }
- om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())
- // Error on first fetchInitialOffset call
- responseBlock := OffsetFetchResponseBlock{
- Err: ErrOffsetsLoadInProgress,
- Offset: 5,
- Metadata: "test_meta",
- }
- fetchResponse := new(OffsetFetchResponse)
- fetchResponse.AddBlock("my_topic", 0, &responseBlock)
- coordinator.Returns(fetchResponse)
- // Second fetchInitialOffset call is fine
- fetchResponse2 := new(OffsetFetchResponse)
- responseBlock2 := responseBlock
- responseBlock2.Err = ErrNoError
- fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
- coordinator.Returns(fetchResponse2)
- pom, err := om.ManagePartition("my_topic", 0)
- if err != nil {
- t.Error(err)
- }
- broker.Close()
- coordinator.Close()
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- if atomic.LoadInt32(&retryCount) == 0 {
- t.Fatal("Expected at least one retry")
- }
- }
- func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, 0)
- testClient.Config().Consumer.Offsets.Initial = OffsetOldest
- // Kafka returns -1 if no offset has been stored for this partition yet.
- pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
- offset, meta := pom.NextOffset()
- if offset != OffsetOldest {
- t.Errorf("Expected offset 5. Actual: %v", offset)
- }
- if meta != "" {
- t.Errorf("Expected metadata to be empty. Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- broker.Close()
- coordinator.Close()
- safeClose(t, testClient)
- }
- func TestPartitionOffsetManagerNextOffset(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, 0)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
- offset, meta := pom.NextOffset()
- if offset != 5 {
- t.Errorf("Expected offset 5. Actual: %v", offset)
- }
- if meta != "test_meta" {
- t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- broker.Close()
- coordinator.Close()
- safeClose(t, testClient)
- }
- func TestPartitionOffsetManagerResetOffset(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, 0)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- coordinator.Returns(ocResponse)
- expected := int64(1)
- pom.ResetOffset(expected, "modified_meta")
- actual, meta := pom.NextOffset()
- if actual != expected {
- t.Errorf("Expected offset %v. Actual: %v", expected, actual)
- }
- if meta != "modified_meta" {
- t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- broker.Close()
- coordinator.Close()
- }
- func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- handler := func(req *request) (res encoderWithHeader) {
- if req.body.version() != 2 {
- t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
- }
- offsetCommitRequest := req.body.(*OffsetCommitRequest)
- if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
- t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
- }
- return ocResponse
- }
- coordinator.setHandler(handler)
- expected := int64(1)
- pom.ResetOffset(expected, "modified_meta")
- actual, meta := pom.NextOffset()
- if actual != expected {
- t.Errorf("Expected offset %v. Actual: %v", expected, actual)
- }
- if meta != "modified_meta" {
- t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- broker.Close()
- coordinator.Close()
- }
- func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, 0)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- coordinator.Returns(ocResponse)
- pom.MarkOffset(100, "modified_meta")
- offset, meta := pom.NextOffset()
- if offset != 100 {
- t.Errorf("Expected offset 100. Actual: %v", offset)
- }
- if meta != "modified_meta" {
- t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- broker.Close()
- coordinator.Close()
- }
- func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- handler := func(req *request) (res encoderWithHeader) {
- if req.body.version() != 2 {
- t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
- }
- offsetCommitRequest := req.body.(*OffsetCommitRequest)
- if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
- t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
- }
- return ocResponse
- }
- coordinator.setHandler(handler)
- pom.MarkOffset(100, "modified_meta")
- offset, meta := pom.NextOffset()
- if offset != 100 {
- t.Errorf("Expected offset 100. Actual: %v", offset)
- }
- if meta != "modified_meta" {
- t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- broker.Close()
- coordinator.Close()
- }
- func TestPartitionOffsetManagerCommitErr(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, 0)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
- // Error on one partition
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
- ocResponse.AddError("my_topic", 1, ErrNoError)
- coordinator.Returns(ocResponse)
- newCoordinator := NewMockBroker(t, 3)
- // For RefreshCoordinator()
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
- // Nothing in response.Errors at all
- ocResponse2 := new(OffsetCommitResponse)
- newCoordinator.Returns(ocResponse2)
- // No error, no need to refresh coordinator
- // Error on the wrong partition for this pom
- ocResponse3 := new(OffsetCommitResponse)
- ocResponse3.AddError("my_topic", 1, ErrNoError)
- newCoordinator.Returns(ocResponse3)
- // No error, no need to refresh coordinator
- // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
- ocResponse4 := new(OffsetCommitResponse)
- ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
- newCoordinator.Returns(ocResponse4)
- // For RefreshCoordinator()
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
- // Normal error response
- ocResponse5 := new(OffsetCommitResponse)
- ocResponse5.AddError("my_topic", 0, ErrNoError)
- newCoordinator.Returns(ocResponse5)
- pom.MarkOffset(100, "modified_meta")
- err := pom.Close()
- if err != nil {
- t.Error(err)
- }
- broker.Close()
- coordinator.Close()
- newCoordinator.Close()
- safeClose(t, om)
- safeClose(t, testClient)
- }
- // Test of recovery from abort
- func TestAbortPartitionOffsetManager(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t, 0)
- pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
- // this triggers an error in the CommitOffset request,
- // which leads to the abort call
- coordinator.Close()
- // Response to refresh coordinator request
- newCoordinator := NewMockBroker(t, 3)
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- newCoordinator.Returns(ocResponse)
- pom.MarkOffset(100, "modified_meta")
- safeClose(t, pom)
- safeClose(t, om)
- broker.Close()
- safeClose(t, testClient)
- }
|