123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- package sarama
- import (
- "testing"
- "time"
- )
- func initOffsetManager(t *testing.T) (om OffsetManager,
- testClient Client, broker, coordinator *mockBroker) {
- config := NewConfig()
- config.Metadata.Retry.Max = 1
- config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
- broker = newMockBroker(t, 1)
- coordinator = newMockBroker(t, 2)
- seedMeta := new(MetadataResponse)
- seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
- seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
- seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
- broker.Returns(seedMeta)
- var err error
- testClient, err = NewClient([]string{broker.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: coordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: coordinator.Port(),
- })
- om, err = NewOffsetManagerFromClient("group", testClient)
- if err != nil {
- t.Fatal(err)
- }
- return om, testClient, broker, coordinator
- }
- func initPartitionOffsetManager(t *testing.T, om OffsetManager,
- coordinator *mockBroker) PartitionOffsetManager {
- fetchResponse := new(OffsetFetchResponse)
- fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
- Err: ErrNoError,
- Offset: 5,
- Metadata: "test_meta",
- })
- coordinator.Returns(fetchResponse)
- pom, err := om.ManagePartition("my_topic", 0)
- if err != nil {
- t.Fatal(err)
- }
- return pom
- }
- func TestNewOffsetManager(t *testing.T) {
- seedBroker := newMockBroker(t, 1)
- seedBroker.Returns(new(MetadataResponse))
- testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- _, err = NewOffsetManagerFromClient("group", testClient)
- if err != nil {
- t.Error(err)
- }
- safeClose(t, testClient)
- _, err = NewOffsetManagerFromClient("group", testClient)
- if err != ErrClosedClient {
- t.Errorf("Error expected for closed client; actual value: %v", err)
- }
- seedBroker.Close()
- }
- func TestOffsetManagerFetchInitialFail(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t)
-
- responseBlock := OffsetFetchResponseBlock{
- Err: ErrNotCoordinatorForConsumer,
- Offset: 5,
- Metadata: "test_meta",
- }
- fetchResponse := new(OffsetFetchResponse)
- fetchResponse.AddBlock("my_topic", 0, &responseBlock)
- coordinator.Returns(fetchResponse)
-
- newCoordinator := newMockBroker(t, 3)
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
-
- fetchResponse2 := new(OffsetFetchResponse)
- responseBlock2 := responseBlock
- responseBlock2.Err = ErrNoError
- fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
- newCoordinator.Returns(fetchResponse2)
- pom, err := om.ManagePartition("my_topic", 0)
- if err != nil {
- t.Error(err)
- }
- broker.Close()
- coordinator.Close()
- newCoordinator.Close()
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- }
- func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t)
-
- responseBlock := OffsetFetchResponseBlock{
- Err: ErrOffsetsLoadInProgress,
- Offset: 5,
- Metadata: "test_meta",
- }
- fetchResponse := new(OffsetFetchResponse)
- fetchResponse.AddBlock("my_topic", 0, &responseBlock)
- coordinator.Returns(fetchResponse)
-
- fetchResponse2 := new(OffsetFetchResponse)
- responseBlock2 := responseBlock
- responseBlock2.Err = ErrNoError
- fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
- coordinator.Returns(fetchResponse2)
- pom, err := om.ManagePartition("my_topic", 0)
- if err != nil {
- t.Error(err)
- }
- broker.Close()
- coordinator.Close()
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- }
- func TestPartitionOffsetManagerOffset(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t)
- pom := initPartitionOffsetManager(t, om, coordinator)
- offset, meta := pom.Offset()
- if offset != 5 {
- t.Errorf("Expected offset 5. Actual: %v", offset)
- }
- if meta != "test_meta" {
- t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- broker.Close()
- coordinator.Close()
- safeClose(t, testClient)
- }
- func TestPartitionOffsetManagerSetOffset(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t)
- pom := initPartitionOffsetManager(t, om, coordinator)
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- coordinator.Returns(ocResponse)
- pom.SetOffset(100, "modified_meta")
- offset, meta := pom.Offset()
- if offset != 100 {
- t.Errorf("Expected offset 100. Actual: %v", offset)
- }
- if meta != "modified_meta" {
- t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
- }
- safeClose(t, pom)
- safeClose(t, om)
- safeClose(t, testClient)
- broker.Close()
- coordinator.Close()
- }
- func TestPartitionOffsetManagerCommitErr(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t)
- pom := initPartitionOffsetManager(t, om, coordinator)
-
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
- ocResponse.AddError("my_topic", 1, ErrNoError)
- coordinator.Returns(ocResponse)
- newCoordinator := newMockBroker(t, 3)
-
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
-
- ocResponse2 := new(OffsetCommitResponse)
- newCoordinator.Returns(ocResponse2)
-
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
-
- ocResponse3 := new(OffsetCommitResponse)
- ocResponse3.AddError("my_topic", 1, ErrNoError)
- newCoordinator.Returns(ocResponse3)
-
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
-
- ocResponse4 := new(OffsetCommitResponse)
- ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
- newCoordinator.Returns(ocResponse4)
-
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
-
- ocResponse5 := new(OffsetCommitResponse)
- ocResponse5.AddError("my_topic", 0, ErrNoError)
- newCoordinator.Returns(ocResponse5)
- pom.SetOffset(100, "modified_meta")
- err := pom.Close()
- if err != nil {
- t.Error(err)
- }
- broker.Close()
- coordinator.Close()
- newCoordinator.Close()
- safeClose(t, om)
- safeClose(t, testClient)
- }
- func TestAbortPartitionOffsetManager(t *testing.T) {
- om, testClient, broker, coordinator := initOffsetManager(t)
- pom := initPartitionOffsetManager(t, om, coordinator)
-
-
- coordinator.Close()
-
- newCoordinator := newMockBroker(t, 3)
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: newCoordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: newCoordinator.Port(),
- })
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- newCoordinator.Returns(ocResponse)
- pom.SetOffset(100, "modified_meta")
- safeClose(t, pom)
- safeClose(t, om)
- broker.Close()
- safeClose(t, testClient)
- }
|