package sarama import ( "os" "testing" "github.com/rcrowley/go-metrics" ) func TestDefaultConfigValidates(t *testing.T) { config := NewConfig() if err := config.Validate(); err != nil { t.Error(err) } if config.MetricRegistry == nil { t.Error("Expected non nil metrics.MetricRegistry, got nil") } } func TestInvalidClientIDConfigValidates(t *testing.T) { config := NewConfig() config.ClientID = "foo:bar" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { t.Error("Expected invalid ClientID, got ", err) } } func TestEmptyClientIDConfigValidates(t *testing.T) { config := NewConfig() config.ClientID = "" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { t.Error("Expected invalid ClientID, got ", err) } } func TestNetConfigValidates(t *testing.T) { tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs err string }{ { "OpenRequests", func(cfg *Config) { cfg.Net.MaxOpenRequests = 0 }, "Net.MaxOpenRequests must be > 0"}, {"DialTimeout", func(cfg *Config) { cfg.Net.DialTimeout = 0 }, "Net.DialTimeout must be > 0"}, {"ReadTimeout", func(cfg *Config) { cfg.Net.ReadTimeout = 0 }, "Net.ReadTimeout must be > 0"}, {"WriteTimeout", func(cfg *Config) { cfg.Net.WriteTimeout = 0 }, "Net.WriteTimeout must be > 0"}, {"KeepAlive", func(cfg *Config) { cfg.Net.KeepAlive = -1 }, "Net.KeepAlive must be >= 0"}, {"SASL.User", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.User = "" }, "Net.SASL.User must not be empty when SASL is enabled"}, {"SASL.Password", func(cfg *Config) { cfg.Net.SASL.Enable = true cfg.Net.SASL.User = "user" cfg.Net.SASL.Password = "" }, "Net.SASL.Password must not be empty when SASL is enabled"}, } for i, test := range tests { c := NewConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) } } } func TestMetadataConfigValidates(t *testing.T) { tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs err string }{ { "Retry.Max", func(cfg *Config) { cfg.Metadata.Retry.Max = -1 }, "Metadata.Retry.Max must be >= 0"}, {"Retry.Backoff", func(cfg *Config) { cfg.Metadata.Retry.Backoff = -1 }, "Metadata.Retry.Backoff must be >= 0"}, {"RefreshFrequency", func(cfg *Config) { cfg.Metadata.RefreshFrequency = -1 }, "Metadata.RefreshFrequency must be >= 0"}, } for i, test := range tests { c := NewConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) } } } func TestAdminConfigValidates(t *testing.T) { tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs err string }{ {"Timeout", func(cfg *Config) { cfg.Admin.Timeout = 0 }, "Admin.Timeout must be > 0"}, } for i, test := range tests { c := NewConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) } } } func TestProducerConfigValidates(t *testing.T) { tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs err string }{ { "MaxMessageBytes", func(cfg *Config) { cfg.Producer.MaxMessageBytes = 0 }, "Producer.MaxMessageBytes must be > 0"}, {"RequiredAcks", func(cfg *Config) { cfg.Producer.RequiredAcks = -2 }, "Producer.RequiredAcks must be >= -1"}, {"Timeout", func(cfg *Config) { cfg.Producer.Timeout = 0 }, "Producer.Timeout must be > 0"}, {"Partitioner", func(cfg *Config) { cfg.Producer.Partitioner = nil }, "Producer.Partitioner must not be nil"}, {"Flush.Bytes", func(cfg *Config) { cfg.Producer.Flush.Bytes = -1 }, "Producer.Flush.Bytes must be >= 0"}, {"Flush.Messages", func(cfg *Config) { cfg.Producer.Flush.Messages = -1 }, "Producer.Flush.Messages must be >= 0"}, {"Flush.Frequency", func(cfg *Config) { cfg.Producer.Flush.Frequency = -1 }, "Producer.Flush.Frequency must be >= 0"}, {"Flush.MaxMessages", func(cfg *Config) { cfg.Producer.Flush.MaxMessages = -1 }, "Producer.Flush.MaxMessages must be >= 0"}, {"Flush.MaxMessages with Producer.Flush.Messages", func(cfg *Config) { cfg.Producer.Flush.MaxMessages = 1 cfg.Producer.Flush.Messages = 2 }, "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"}, {"Flush.Retry.Max", func(cfg *Config) { cfg.Producer.Retry.Max = -1 }, "Producer.Retry.Max must be >= 0"}, {"Flush.Retry.Backoff", func(cfg *Config) { cfg.Producer.Retry.Backoff = -1 }, "Producer.Retry.Backoff must be >= 0"}, {"Idempotent Version", func(cfg *Config) { cfg.Producer.Idempotent = true cfg.Version = V0_10_0_0 }, "Idempotent producer requires Version >= V0_11_0_0"}, {"Idempotent with Producer.Retry.Max", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true cfg.Producer.Retry.Max = 0 }, "Idempotent producer requires Producer.Retry.Max >= 1"}, {"Idempotent with Producer.RequiredAcks", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true }, "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"}, {"Idempotent with Net.MaxOpenRequests", func(cfg *Config) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true cfg.Producer.RequiredAcks = WaitForAll }, "Idempotent producer requires Net.MaxOpenRequests to be 1"}, } for i, test := range tests { c := NewConfig() test.cfg(c) if err := c.Validate(); string(err.(ConfigurationError)) != test.err { t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) } } } func TestLZ4ConfigValidation(t *testing.T) { config := NewConfig() config.Producer.Compression = CompressionLZ4 if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" { t.Error("Expected invalid lz4/kafka version error, got ", err) } config.Version = V0_10_0_0 if err := config.Validate(); err != nil { t.Error("Expected lz4 to work, got ", err) } } // This example shows how to integrate with an existing registry as well as publishing metrics // on the standard output func ExampleConfig_metrics() { // Our application registry appMetricRegistry := metrics.NewRegistry() appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry) appGauge.Update(1) config := NewConfig() // Use a prefix registry instead of the default local one config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.") // Simulate a metric created by sarama without starting a broker saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry) saramaGauge.Update(2) metrics.WriteOnce(appMetricRegistry, os.Stdout) // Output: // gauge m1 // value: 1 // gauge sarama.m2 // value: 2 }