123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- package sarama
- import (
- "os"
- "testing"
- "github.com/rcrowley/go-metrics"
- )
- func TestDefaultConfigValidates(t *testing.T) {
- config := NewConfig()
- if err := config.Validate(); err != nil {
- t.Error(err)
- }
- if config.MetricRegistry == nil {
- t.Error("Expected non nil metrics.MetricRegistry, got nil")
- }
- }
- func TestInvalidClientIDConfigValidates(t *testing.T) {
- config := NewConfig()
- config.ClientID = "foo:bar"
- if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
- t.Error("Expected invalid ClientID, got ", err)
- }
- }
- func TestEmptyClientIDConfigValidates(t *testing.T) {
- config := NewConfig()
- config.ClientID = ""
- if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
- t.Error("Expected invalid ClientID, got ", err)
- }
- }
- func TestNetConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {
- "OpenRequests",
- func(cfg *Config) {
- cfg.Net.MaxOpenRequests = 0
- },
- "Net.MaxOpenRequests must be > 0"},
- {"DialTimeout",
- func(cfg *Config) {
- cfg.Net.DialTimeout = 0
- },
- "Net.DialTimeout must be > 0"},
- {"ReadTimeout",
- func(cfg *Config) {
- cfg.Net.ReadTimeout = 0
- },
- "Net.ReadTimeout must be > 0"},
- {"WriteTimeout",
- func(cfg *Config) {
- cfg.Net.WriteTimeout = 0
- },
- "Net.WriteTimeout must be > 0"},
- {"KeepAlive",
- func(cfg *Config) {
- cfg.Net.KeepAlive = -1
- },
- "Net.KeepAlive must be >= 0"},
- {"SASL.User",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.User = ""
- },
- "Net.SASL.User must not be empty when SASL is enabled"},
- {"SASL.Password",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.User = "user"
- cfg.Net.SASL.Password = ""
- },
- "Net.SASL.Password must not be empty when SASL is enabled"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestMetadataConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {
- "Retry.Max",
- func(cfg *Config) {
- cfg.Metadata.Retry.Max = -1
- },
- "Metadata.Retry.Max must be >= 0"},
- {"Retry.Backoff",
- func(cfg *Config) {
- cfg.Metadata.Retry.Backoff = -1
- },
- "Metadata.Retry.Backoff must be >= 0"},
- {"RefreshFrequency",
- func(cfg *Config) {
- cfg.Metadata.RefreshFrequency = -1
- },
- "Metadata.RefreshFrequency must be >= 0"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestAdminConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {"Timeout",
- func(cfg *Config) {
- cfg.Admin.Timeout = 0
- },
- "Admin.Timeout must be > 0"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestProducerConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {
- "MaxMessageBytes",
- func(cfg *Config) {
- cfg.Producer.MaxMessageBytes = 0
- },
- "Producer.MaxMessageBytes must be > 0"},
- {"RequiredAcks",
- func(cfg *Config) {
- cfg.Producer.RequiredAcks = -2
- },
- "Producer.RequiredAcks must be >= -1"},
- {"Timeout",
- func(cfg *Config) {
- cfg.Producer.Timeout = 0
- },
- "Producer.Timeout must be > 0"},
- {"Partitioner",
- func(cfg *Config) {
- cfg.Producer.Partitioner = nil
- },
- "Producer.Partitioner must not be nil"},
- {"Flush.Bytes",
- func(cfg *Config) {
- cfg.Producer.Flush.Bytes = -1
- },
- "Producer.Flush.Bytes must be >= 0"},
- {"Flush.Messages",
- func(cfg *Config) {
- cfg.Producer.Flush.Messages = -1
- },
- "Producer.Flush.Messages must be >= 0"},
- {"Flush.Frequency",
- func(cfg *Config) {
- cfg.Producer.Flush.Frequency = -1
- },
- "Producer.Flush.Frequency must be >= 0"},
- {"Flush.MaxMessages",
- func(cfg *Config) {
- cfg.Producer.Flush.MaxMessages = -1
- },
- "Producer.Flush.MaxMessages must be >= 0"},
- {"Flush.MaxMessages with Producer.Flush.Messages",
- func(cfg *Config) {
- cfg.Producer.Flush.MaxMessages = 1
- cfg.Producer.Flush.Messages = 2
- },
- "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
- {"Flush.Retry.Max",
- func(cfg *Config) {
- cfg.Producer.Retry.Max = -1
- },
- "Producer.Retry.Max must be >= 0"},
- {"Flush.Retry.Backoff",
- func(cfg *Config) {
- cfg.Producer.Retry.Backoff = -1
- },
- "Producer.Retry.Backoff must be >= 0"},
- {"Idempotent Version",
- func(cfg *Config) {
- cfg.Producer.Idempotent = true
- cfg.Version = V0_10_0_0
- },
- "Idempotent producer requires Version >= V0_11_0_0"},
- {"Idempotent with Producer.Retry.Max",
- func(cfg *Config) {
- cfg.Version = V0_11_0_0
- cfg.Producer.Idempotent = true
- cfg.Producer.Retry.Max = 0
- },
- "Idempotent producer requires Producer.Retry.Max >= 1"},
- {"Idempotent with Producer.RequiredAcks",
- func(cfg *Config) {
- cfg.Version = V0_11_0_0
- cfg.Producer.Idempotent = true
- },
- "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
- {"Idempotent with Net.MaxOpenRequests",
- func(cfg *Config) {
- cfg.Version = V0_11_0_0
- cfg.Producer.Idempotent = true
- cfg.Producer.RequiredAcks = WaitForAll
- },
- "Idempotent producer requires Net.MaxOpenRequests to be 1"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestLZ4ConfigValidation(t *testing.T) {
- config := NewConfig()
- config.Producer.Compression = CompressionLZ4
- if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
- t.Error("Expected invalid lz4/kafka version error, got ", err)
- }
- config.Version = V0_10_0_0
- if err := config.Validate(); err != nil {
- t.Error("Expected lz4 to work, got ", err)
- }
- }
- func ExampleConfig_metrics() {
-
- appMetricRegistry := metrics.NewRegistry()
- appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
- appGauge.Update(1)
- config := NewConfig()
-
- config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
-
- saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
- saramaGauge.Update(2)
- metrics.WriteOnce(appMetricRegistry, os.Stdout)
-
-
-
-
-
- }
|