123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 |
- package sarama
- import (
- "os"
- "testing"
- "github.com/rcrowley/go-metrics"
- )
- func TestDefaultConfigValidates(t *testing.T) {
- config := NewConfig()
- if err := config.Validate(); err != nil {
- t.Error(err)
- }
- if config.MetricRegistry == nil {
- t.Error("Expected non nil metrics.MetricRegistry, got nil")
- }
- }
- func TestInvalidClientIDConfigValidates(t *testing.T) {
- config := NewConfig()
- config.ClientID = "foo:bar"
- if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
- t.Error("Expected invalid ClientID, got ", err)
- }
- }
- func TestEmptyClientIDConfigValidates(t *testing.T) {
- config := NewConfig()
- config.ClientID = ""
- if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
- t.Error("Expected invalid ClientID, got ", err)
- }
- }
- type DummyTokenProvider struct {
- }
- func (t *DummyTokenProvider) Token() (*AccessToken, error) {
- return &AccessToken{Token: "access-token-string"}, nil
- }
- func TestNetConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {
- "OpenRequests",
- func(cfg *Config) {
- cfg.Net.MaxOpenRequests = 0
- },
- "Net.MaxOpenRequests must be > 0"},
- {"DialTimeout",
- func(cfg *Config) {
- cfg.Net.DialTimeout = 0
- },
- "Net.DialTimeout must be > 0"},
- {"ReadTimeout",
- func(cfg *Config) {
- cfg.Net.ReadTimeout = 0
- },
- "Net.ReadTimeout must be > 0"},
- {"WriteTimeout",
- func(cfg *Config) {
- cfg.Net.WriteTimeout = 0
- },
- "Net.WriteTimeout must be > 0"},
- {"SASL.User",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.User = ""
- },
- "Net.SASL.User must not be empty when SASL is enabled"},
- {"SASL.Password",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.User = "user"
- cfg.Net.SASL.Password = ""
- },
- "Net.SASL.Password must not be empty when SASL is enabled"},
- {"SASL.Mechanism - Invalid mechanism type",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
- cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
- },
- "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"},
- {"SASL.Mechanism.OAUTHBEARER - Missing token provider",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = SASLTypeOAuth
- cfg.Net.SASL.TokenProvider = nil
- },
- "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider"},
- {"SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA256
- cfg.Net.SASL.SCRAMClientGeneratorFunc = nil
- cfg.Net.SASL.User = "user"
- cfg.Net.SASL.Password = "stong_password"
- },
- "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
- {"SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
- cfg.Net.SASL.SCRAMClientGeneratorFunc = nil
- cfg.Net.SASL.User = "user"
- cfg.Net.SASL.Password = "stong_password"
- },
- "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
- {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
- cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
- cfg.Net.SASL.GSSAPI.Username = "sarama"
- cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
- cfg.Net.SASL.GSSAPI.Realm = "kafka"
- cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
- },
- "Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
- "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"},
- {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
- cfg.Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH
- cfg.Net.SASL.GSSAPI.Username = "sarama"
- cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
- cfg.Net.SASL.GSSAPI.Realm = "kafka"
- cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
- },
- "Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
- " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"},
- {"SASL.Mechanism GSSAPI (Kerberos) - Missing username",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
- cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
- cfg.Net.SASL.GSSAPI.Password = "sarama"
- cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
- cfg.Net.SASL.GSSAPI.Realm = "kafka"
- cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
- },
- "Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"},
- {"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
- cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
- cfg.Net.SASL.GSSAPI.Username = "sarama"
- cfg.Net.SASL.GSSAPI.Password = "sarama"
- cfg.Net.SASL.GSSAPI.Realm = "kafka"
- cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
- },
- "Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"},
- {"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
- cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
- cfg.Net.SASL.GSSAPI.Username = "sarama"
- cfg.Net.SASL.GSSAPI.Password = "sarama"
- cfg.Net.SASL.GSSAPI.Realm = "kafka"
- cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
- },
- "Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"},
- {"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
- cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
- cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
- cfg.Net.SASL.GSSAPI.Username = "sarama"
- cfg.Net.SASL.GSSAPI.Password = "sarama"
- cfg.Net.SASL.GSSAPI.Realm = "kafka"
- },
- "Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"},
- {"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm",
- func(cfg *Config) {
- cfg.Net.SASL.Enable = true
- cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
- cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
- cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
- cfg.Net.SASL.GSSAPI.Username = "sarama"
- cfg.Net.SASL.GSSAPI.Password = "sarama"
- cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
- },
- "Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestMetadataConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {
- "Retry.Max",
- func(cfg *Config) {
- cfg.Metadata.Retry.Max = -1
- },
- "Metadata.Retry.Max must be >= 0"},
- {"Retry.Backoff",
- func(cfg *Config) {
- cfg.Metadata.Retry.Backoff = -1
- },
- "Metadata.Retry.Backoff must be >= 0"},
- {"RefreshFrequency",
- func(cfg *Config) {
- cfg.Metadata.RefreshFrequency = -1
- },
- "Metadata.RefreshFrequency must be >= 0"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestAdminConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {"Timeout",
- func(cfg *Config) {
- cfg.Admin.Timeout = 0
- },
- "Admin.Timeout must be > 0"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestProducerConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config) // resorting to using a function as a param because of internal composite structs
- err string
- }{
- {
- "MaxMessageBytes",
- func(cfg *Config) {
- cfg.Producer.MaxMessageBytes = 0
- },
- "Producer.MaxMessageBytes must be > 0"},
- {"RequiredAcks",
- func(cfg *Config) {
- cfg.Producer.RequiredAcks = -2
- },
- "Producer.RequiredAcks must be >= -1"},
- {"Timeout",
- func(cfg *Config) {
- cfg.Producer.Timeout = 0
- },
- "Producer.Timeout must be > 0"},
- {"Partitioner",
- func(cfg *Config) {
- cfg.Producer.Partitioner = nil
- },
- "Producer.Partitioner must not be nil"},
- {"Flush.Bytes",
- func(cfg *Config) {
- cfg.Producer.Flush.Bytes = -1
- },
- "Producer.Flush.Bytes must be >= 0"},
- {"Flush.Messages",
- func(cfg *Config) {
- cfg.Producer.Flush.Messages = -1
- },
- "Producer.Flush.Messages must be >= 0"},
- {"Flush.Frequency",
- func(cfg *Config) {
- cfg.Producer.Flush.Frequency = -1
- },
- "Producer.Flush.Frequency must be >= 0"},
- {"Flush.MaxMessages",
- func(cfg *Config) {
- cfg.Producer.Flush.MaxMessages = -1
- },
- "Producer.Flush.MaxMessages must be >= 0"},
- {"Flush.MaxMessages with Producer.Flush.Messages",
- func(cfg *Config) {
- cfg.Producer.Flush.MaxMessages = 1
- cfg.Producer.Flush.Messages = 2
- },
- "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
- {"Flush.Retry.Max",
- func(cfg *Config) {
- cfg.Producer.Retry.Max = -1
- },
- "Producer.Retry.Max must be >= 0"},
- {"Flush.Retry.Backoff",
- func(cfg *Config) {
- cfg.Producer.Retry.Backoff = -1
- },
- "Producer.Retry.Backoff must be >= 0"},
- {"Idempotent Version",
- func(cfg *Config) {
- cfg.Producer.Idempotent = true
- cfg.Version = V0_10_0_0
- },
- "Idempotent producer requires Version >= V0_11_0_0"},
- {"Idempotent with Producer.Retry.Max",
- func(cfg *Config) {
- cfg.Version = V0_11_0_0
- cfg.Producer.Idempotent = true
- cfg.Producer.Retry.Max = 0
- },
- "Idempotent producer requires Producer.Retry.Max >= 1"},
- {"Idempotent with Producer.RequiredAcks",
- func(cfg *Config) {
- cfg.Version = V0_11_0_0
- cfg.Producer.Idempotent = true
- },
- "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
- {"Idempotent with Net.MaxOpenRequests",
- func(cfg *Config) {
- cfg.Version = V0_11_0_0
- cfg.Producer.Idempotent = true
- cfg.Producer.RequiredAcks = WaitForAll
- },
- "Idempotent producer requires Net.MaxOpenRequests to be 1"},
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestConsumerConfigValidates(t *testing.T) {
- tests := []struct {
- name string
- cfg func(*Config)
- err string
- }{
- {"ReadCommitted Version",
- func(cfg *Config) {
- cfg.Version = V0_10_0_0
- cfg.Consumer.IsolationLevel = ReadCommitted
- },
- "ReadCommitted requires Version >= V0_11_0_0",
- },
- {"Incorrect isolation level",
- func(cfg *Config) {
- cfg.Version = V0_11_0_0
- cfg.Consumer.IsolationLevel = int8(42)
- },
- "Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
- },
- }
- for i, test := range tests {
- c := NewConfig()
- test.cfg(c)
- if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
- t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
- }
- }
- }
- func TestLZ4ConfigValidation(t *testing.T) {
- config := NewConfig()
- config.Producer.Compression = CompressionLZ4
- if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
- t.Error("Expected invalid lz4/kafka version error, got ", err)
- }
- config.Version = V0_10_0_0
- if err := config.Validate(); err != nil {
- t.Error("Expected lz4 to work, got ", err)
- }
- }
- func TestZstdConfigValidation(t *testing.T) {
- config := NewConfig()
- config.Producer.Compression = CompressionZSTD
- if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" {
- t.Error("Expected invalid zstd/kafka version error, got ", err)
- }
- config.Version = V2_1_0_0
- if err := config.Validate(); err != nil {
- t.Error("Expected zstd to work, got ", err)
- }
- }
- // This example shows how to integrate with an existing registry as well as publishing metrics
- // on the standard output
- func ExampleConfig_metrics() {
- // Our application registry
- appMetricRegistry := metrics.NewRegistry()
- appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
- appGauge.Update(1)
- config := NewConfig()
- // Use a prefix registry instead of the default local one
- config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
- // Simulate a metric created by sarama without starting a broker
- saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
- saramaGauge.Update(2)
- metrics.WriteOnce(appMetricRegistry, os.Stdout)
- // Output:
- // gauge m1
- // value: 1
- // gauge sarama.m2
- // value: 2
- }
|