| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- package sarama
- import (
- "log"
- "testing"
- "time"
- )
- var (
- broker, coordinator *mockBroker
- testClient Client
- )
- func TestNewOffsetManager(t *testing.T) {
- seedBroker := newMockBroker(t, 1)
- seedBroker.Returns(new(MetadataResponse))
- testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- _, err = NewOffsetManagerFromClient("grouop", testClient)
- if err != nil {
- t.Error(err)
- }
- testClient.Close()
- _, err = NewOffsetManagerFromClient("group", testClient)
- if err != ErrClosedClient {
- t.Errorf("Error expected for closed client; actual value: %v", err)
- }
- seedBroker.Close()
- }
- func TestFetchInitialFail(t *testing.T) {
- // Mostly copy-paste from initPOM
- // TODO: eliminate this repetition
- config := NewConfig()
- config.Metadata.Retry.Max = 0
- config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
- broker = newMockBroker(t, 1)
- coordinator = newMockBroker(t, 2)
- seedMeta := new(MetadataResponse)
- seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
- seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
- seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
- broker.Returns(seedMeta)
- var err error
- testClient, err = NewClient([]string{broker.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: coordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: coordinator.Port(),
- })
- om, err := NewOffsetManagerFromClient("group", testClient)
- if err != nil {
- t.Fatal(err)
- }
- fetchResponse := new(OffsetFetchResponse)
- // Only Err below modified
- fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
- Err: ErrNotCoordinatorForConsumer,
- Offset: 5,
- Metadata: "test_meta",
- })
- coordinator.Returns(fetchResponse)
- _, err = om.ManagePartition("my_topic", 0)
- if err != ErrNotCoordinatorForConsumer {
- t.Error(err)
- }
- }
- func initPOM(t *testing.T) PartitionOffsetManager {
- config := NewConfig()
- config.Metadata.Retry.Max = 0
- config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
- broker = newMockBroker(t, 1)
- coordinator = newMockBroker(t, 2)
- seedMeta := new(MetadataResponse)
- seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
- seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
- seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
- broker.Returns(seedMeta)
- var err error
- testClient, err = NewClient([]string{broker.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- broker.Returns(&ConsumerMetadataResponse{
- CoordinatorID: coordinator.BrokerID(),
- CoordinatorHost: "127.0.0.1",
- CoordinatorPort: coordinator.Port(),
- })
- om, err := NewOffsetManagerFromClient("group", testClient)
- if err != nil {
- t.Fatal(err)
- }
- fetchResponse := new(OffsetFetchResponse)
- fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
- Err: ErrNoError,
- Offset: 5,
- Metadata: "test_meta",
- })
- coordinator.Returns(fetchResponse)
- pom, err := om.ManagePartition("my_topic", 0)
- if err != nil {
- t.Fatal(err)
- }
- return pom
- }
- func TestOffset(t *testing.T) {
- pom := initPOM(t)
- offset, meta := pom.Offset()
- if offset != 5 {
- t.Errorf("Expected offset 5. Actual: %v", offset)
- }
- if meta != "test_meta" {
- t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
- }
- pom.Close()
- testClient.Close()
- broker.Close()
- coordinator.Close()
- }
- func TestSetOffset(t *testing.T) {
- pom := initPOM(t)
- ocResponse := new(OffsetCommitResponse)
- ocResponse.AddError("my_topic", 0, ErrNoError)
- coordinator.Returns(ocResponse)
- pom.SetOffset(100, "modified_meta")
- offset, meta := pom.Offset()
- if offset != 100 {
- t.Errorf("Expected offset 100. Actual: %v", offset)
- }
- if meta != "modified_meta" {
- t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
- }
- pom.Close()
- testClient.Close()
- broker.Close()
- coordinator.Close()
- }
- // This test is not passing
- func TestCommitErr(t *testing.T) {
- log.Println("TestCommitErr")
- log.Println("=============")
- pom := initPOM(t)
- ocResponse := new(OffsetCommitResponse)
- // ocResponse.Errors
- ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
- ocResponse.AddError("my_topic", 1, ErrNoError)
- coordinator.Returns(ocResponse)
- // ocResponse2 := new(OffsetCommitResponse)
- // // ocResponse.Errors
- // ocResponse2.AddError("my_topic", 1, ErrNoError)
- // coordinator.Returns(ocResponse2)
- freshCoordinator := newMockBroker(t, 3)
- // For RefreshCoordinator()
- coordinatorResponse3 := new(ConsumerMetadataResponse)
- coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
- coordinatorResponse3.CoordinatorHost = "127.0.0.1"
- coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
- broker.Returns(coordinatorResponse3)
- ocResponse2 := new(OffsetCommitResponse)
- // ocResponse.Errors
- // ocResponse2.AddError("my_topic", 0, ErrOffsetOutOfRange)
- ocResponse2.AddError("my_topic", 0, ErrNoError)
- freshCoordinator.Returns(ocResponse2)
- pom.SetOffset(100, "modified_meta")
- err := pom.Close()
- if err != nil {
- t.Error(err)
- }
- }
|