package sarama

import (
	"os"
	"testing"

	"github.com/rcrowley/go-metrics"
)

func TestDefaultConfigValidates(t *testing.T) {
	config := NewConfig()
	if err := config.Validate(); err != nil {
		t.Error(err)
	}
	if config.MetricRegistry == nil {
		t.Error("Expected non nil metrics.MetricRegistry, got nil")
	}
}

func TestInvalidClientIDConfigValidates(t *testing.T) {
	config := NewConfig()
	config.ClientID = "foo:bar"
	if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
		t.Error("Expected invalid ClientID, got ", err)
	}
}

func TestEmptyClientIDConfigValidates(t *testing.T) {
	config := NewConfig()
	config.ClientID = ""
	if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
		t.Error("Expected invalid ClientID, got ", err)
	}
}

func TestNetConfigValidates(t *testing.T) {
	tests := []struct {
		name string
		cfg  func(*Config) // resorting to using a function as a param because of internal composite structs
		err  string
	}{
		{
			"OpenRequests",
			func(cfg *Config) {
				cfg.Net.MaxOpenRequests = 0
			},
			"Net.MaxOpenRequests must be > 0"},
		{"DialTimeout",
			func(cfg *Config) {
				cfg.Net.DialTimeout = 0
			},
			"Net.DialTimeout must be > 0"},
		{"ReadTimeout",
			func(cfg *Config) {
				cfg.Net.ReadTimeout = 0
			},
			"Net.ReadTimeout must be > 0"},
		{"WriteTimeout",
			func(cfg *Config) {
				cfg.Net.WriteTimeout = 0
			},
			"Net.WriteTimeout must be > 0"},
		{"KeepAlive",
			func(cfg *Config) {
				cfg.Net.KeepAlive = -1
			},
			"Net.KeepAlive must be >= 0"},
		{"SASL.User",
			func(cfg *Config) {
				cfg.Net.SASL.Enable = true
				cfg.Net.SASL.User = ""
			},
			"Net.SASL.User must not be empty when SASL is enabled"},
		{"SASL.Password",
			func(cfg *Config) {
				cfg.Net.SASL.Enable = true
				cfg.Net.SASL.User = "user"
				cfg.Net.SASL.Password = ""
			},
			"Net.SASL.Password must not be empty when SASL is enabled"},
	}

	for i, test := range tests {
		c := NewConfig()
		test.cfg(c)
		if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
			t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
		}
	}
}

func TestMetadataConfigValidates(t *testing.T) {
	tests := []struct {
		name string
		cfg  func(*Config) // resorting to using a function as a param because of internal composite structs
		err  string
	}{
		{
			"Retry.Max",
			func(cfg *Config) {
				cfg.Metadata.Retry.Max = -1
			},
			"Metadata.Retry.Max must be >= 0"},
		{"Retry.Backoff",
			func(cfg *Config) {
				cfg.Metadata.Retry.Backoff = -1
			},
			"Metadata.Retry.Backoff must be >= 0"},
		{"RefreshFrequency",
			func(cfg *Config) {
				cfg.Metadata.RefreshFrequency = -1
			},
			"Metadata.RefreshFrequency must be >= 0"},
	}

	for i, test := range tests {
		c := NewConfig()
		test.cfg(c)
		if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
			t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
		}
	}
}

func TestAdminConfigValidates(t *testing.T) {
	tests := []struct {
		name string
		cfg  func(*Config) // resorting to using a function as a param because of internal composite structs
		err  string
	}{
		{"Timeout",
			func(cfg *Config) {
				cfg.Admin.Timeout = 0
			},
			"Admin.Timeout must be > 0"},
	}

	for i, test := range tests {
		c := NewConfig()
		test.cfg(c)
		if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
			t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
		}
	}
}

func TestProducerConfigValidates(t *testing.T) {
	tests := []struct {
		name string
		cfg  func(*Config) // resorting to using a function as a param because of internal composite structs
		err  string
	}{
		{
			"MaxMessageBytes",
			func(cfg *Config) {
				cfg.Producer.MaxMessageBytes = 0
			},
			"Producer.MaxMessageBytes must be > 0"},
		{"RequiredAcks",
			func(cfg *Config) {
				cfg.Producer.RequiredAcks = -2
			},
			"Producer.RequiredAcks must be >= -1"},
		{"Timeout",
			func(cfg *Config) {
				cfg.Producer.Timeout = 0
			},
			"Producer.Timeout must be > 0"},
		{"Partitioner",
			func(cfg *Config) {
				cfg.Producer.Partitioner = nil
			},
			"Producer.Partitioner must not be nil"},
		{"Flush.Bytes",
			func(cfg *Config) {
				cfg.Producer.Flush.Bytes = -1
			},
			"Producer.Flush.Bytes must be >= 0"},
		{"Flush.Messages",
			func(cfg *Config) {
				cfg.Producer.Flush.Messages = -1
			},
			"Producer.Flush.Messages must be >= 0"},
		{"Flush.Frequency",
			func(cfg *Config) {
				cfg.Producer.Flush.Frequency = -1
			},
			"Producer.Flush.Frequency must be >= 0"},
		{"Flush.MaxMessages",
			func(cfg *Config) {
				cfg.Producer.Flush.MaxMessages = -1
			},
			"Producer.Flush.MaxMessages must be >= 0"},
		{"Flush.MaxMessages with Producer.Flush.Messages",
			func(cfg *Config) {
				cfg.Producer.Flush.MaxMessages = 1
				cfg.Producer.Flush.Messages = 2
			},
			"Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
		{"Flush.Retry.Max",
			func(cfg *Config) {
				cfg.Producer.Retry.Max = -1
			},
			"Producer.Retry.Max must be >= 0"},
		{"Flush.Retry.Backoff",
			func(cfg *Config) {
				cfg.Producer.Retry.Backoff = -1
			},
			"Producer.Retry.Backoff must be >= 0"},
		{"Idempotent Version",
			func(cfg *Config) {
				cfg.Producer.Idempotent = true
				cfg.Version = V0_10_0_0
			},
			"Idempotent producer requires Version >= V0_11_0_0"},
		{"Idempotent with Producer.Retry.Max",
			func(cfg *Config) {
				cfg.Version = V0_11_0_0
				cfg.Producer.Idempotent = true
				cfg.Producer.Retry.Max = 0
			},
			"Idempotent producer requires Producer.Retry.Max >= 1"},
		{"Idempotent with Producer.RequiredAcks",
			func(cfg *Config) {
				cfg.Version = V0_11_0_0
				cfg.Producer.Idempotent = true
			},
			"Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
		{"Idempotent with Net.MaxOpenRequests",
			func(cfg *Config) {
				cfg.Version = V0_11_0_0
				cfg.Producer.Idempotent = true
				cfg.Producer.RequiredAcks = WaitForAll
			},
			"Idempotent producer requires Net.MaxOpenRequests to be 1"},
	}

	for i, test := range tests {
		c := NewConfig()
		test.cfg(c)
		if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
			t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
		}
	}
}

func TestLZ4ConfigValidation(t *testing.T) {
	config := NewConfig()
	config.Producer.Compression = CompressionLZ4
	if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
		t.Error("Expected invalid lz4/kafka version error, got ", err)
	}
	config.Version = V0_10_0_0
	if err := config.Validate(); err != nil {
		t.Error("Expected lz4 to work, got ", err)
	}
}

// This example shows how to integrate with an existing registry as well as publishing metrics
// on the standard output
func ExampleConfig_metrics() {
	// Our application registry
	appMetricRegistry := metrics.NewRegistry()
	appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
	appGauge.Update(1)

	config := NewConfig()
	// Use a prefix registry instead of the default local one
	config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")

	// Simulate a metric created by sarama without starting a broker
	saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
	saramaGauge.Update(2)

	metrics.WriteOnce(appMetricRegistry, os.Stdout)
	// Output:
	// gauge m1
	//   value:               1
	// gauge sarama.m2
	//   value:               2
}