123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- //+build functional
- package sarama
- import (
- "context"
- "fmt"
- "io"
- "log"
- "net"
- "net/http"
- "net/url"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "strings"
- "testing"
- "time"
- toxiproxy "github.com/Shopify/toxiproxy/client"
- )
- const (
- uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
- )
- var (
- testTopicDetails = map[string]*TopicDetail{
- "test.1": {
- NumPartitions: 1,
- ReplicationFactor: 3,
- },
- "test.4": {
- NumPartitions: 4,
- ReplicationFactor: 3,
- },
- "test.64": {
- NumPartitions: 64,
- ReplicationFactor: 3,
- },
- "uncommitted-topic-test-4": {
- NumPartitions: 1,
- ReplicationFactor: 3,
- },
- }
- FunctionalTestEnv *testEnvironment
- )
- func TestMain(m *testing.M) {
- // Functional tests for Sarama
- //
- // You can either set TOXIPROXY_ADDR, which points at a toxiproxy address
- // already set up with 21801-21805 bound to zookeeper and 29091-29095
- // bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try
- // and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka
- // cluster, with toxiproxy configured as above.
- //
- // In either case, the following topics will be deleted (if they exist) and
- // then created/pre-seeded with data for the functional test run:
- // * uncomitted-topic-test-4
- // * test.1
- // * test.4
- // * test.64
- os.Exit(testMain(m))
- }
- func testMain(m *testing.M) int {
- ctx := context.Background()
- var env testEnvironment
- if os.Getenv("DEBUG") == "true" {
- Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
- }
- usingExisting, err := existingEnvironment(ctx, &env)
- if err != nil {
- panic(err)
- }
- if !usingExisting {
- err := prepareDockerTestEnvironment(ctx, &env)
- if err != nil {
- _ = tearDownDockerTestEnvironment(ctx, &env)
- panic(err)
- }
- defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck
- }
- if err := prepareTestTopics(ctx, &env); err != nil {
- panic(err)
- }
- FunctionalTestEnv = &env
- return m.Run()
- }
- type testEnvironment struct {
- ToxiproxyClient *toxiproxy.Client
- Proxies map[string]*toxiproxy.Proxy
- KafkaBrokerAddrs []string
- KafkaVersion string
- }
- func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
- Logger.Println("bringing up docker-based test environment")
- // Always (try to) tear down first.
- if err := tearDownDockerTestEnvironment(ctx, env); err != nil {
- return fmt.Errorf("failed to tear down existing env: %w", err)
- }
- if version, ok := os.LookupEnv("KAFKA_VERSION"); ok {
- env.KafkaVersion = version
- } else {
- // We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0.
- env.KafkaVersion = "2.5.0"
- }
- // the mapping of confluent platform docker image versions -> kafka versions can be
- // found here: https://docs.confluent.io/current/installation/versions-interoperability.html
- var confluentPlatformVersion string
- switch env.KafkaVersion {
- case "2.6.0":
- confluentPlatformVersion = "5.5.0"
- case "2.4.1":
- confluentPlatformVersion = "5.4.2"
- default:
- return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion)
- }
- c := exec.Command("docker-compose", "up", "-d")
- c.Stdout = os.Stdout
- c.Stderr = os.Stderr
- c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion))
- err := c.Run()
- if err != nil {
- return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err)
- }
- // Set up toxiproxy Proxies
- env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
- env.Proxies = map[string]*toxiproxy.Proxy{}
- for i := 1; i <= 5; i++ {
- proxyName := fmt.Sprintf("kafka%d", i)
- proxy, err := env.ToxiproxyClient.CreateProxy(
- proxyName,
- fmt.Sprintf("0.0.0.0:%d", 29090+i),
- fmt.Sprintf("kafka-%d:%d", i, 29090+i),
- )
- if err != nil {
- return fmt.Errorf("failed to create toxiproxy: %w", err)
- }
- env.Proxies[proxyName] = proxy
- env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
- }
- // Wait for the kafka broker to come up
- allBrokersUp := false
- for i := 0; i < 45 && !allBrokersUp; i++ {
- Logger.Println("waiting for kafka brokers to come up")
- time.Sleep(1 * time.Second)
- config := NewConfig()
- config.Version, err = ParseKafkaVersion(env.KafkaVersion)
- if err != nil {
- return err
- }
- config.Net.DialTimeout = 1 * time.Second
- config.Net.ReadTimeout = 1 * time.Second
- config.Net.WriteTimeout = 1 * time.Second
- config.ClientID = "sarama-tests"
- brokersOk := make([]bool, len(env.KafkaBrokerAddrs))
- retryLoop:
- for j, addr := range env.KafkaBrokerAddrs {
- client, err := NewClient([]string{addr}, config)
- if err != nil {
- continue
- }
- err = client.RefreshMetadata()
- if err != nil {
- continue
- }
- brokers := client.Brokers()
- if len(brokers) < 5 {
- continue
- }
- for _, broker := range brokers {
- err := broker.Open(client.Config())
- if err != nil {
- continue retryLoop
- }
- connected, err := broker.Connected()
- if err != nil || !connected {
- continue retryLoop
- }
- }
- brokersOk[j] = true
- }
- allBrokersUp = true
- for _, u := range brokersOk {
- allBrokersUp = allBrokersUp && u
- }
- }
- if !allBrokersUp {
- return fmt.Errorf("timed out waiting for broker to come up")
- }
- return nil
- }
- func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) {
- toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR")
- if !ok {
- return false, nil
- }
- toxiproxyURL, err := url.Parse(toxiproxyAddr)
- if err != nil {
- return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
- }
- toxiproxyHost := toxiproxyURL.Hostname()
- env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
- for i := 1; i <= 5; i++ {
- proxyName := fmt.Sprintf("kafka%d", i)
- proxy, err := env.ToxiproxyClient.Proxy(proxyName)
- if err != nil {
- return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
- }
- env.Proxies[proxyName] = proxy
- // get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
- _, proxyPort, err := net.SplitHostPort(proxy.Listen)
- if err != nil {
- return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
- }
- env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort))
- }
- env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
- if !ok {
- return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR")
- }
- return true, nil
- }
- func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
- c := exec.Command("docker-compose", "down", "--volumes")
- c.Stdout = os.Stdout
- c.Stderr = os.Stderr
- downErr := c.Run()
- c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop")
- c.Stdout = os.Stdout
- c.Stderr = os.Stderr
- rmErr := c.Run()
- if downErr != nil {
- return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr)
- }
- if rmErr != nil {
- return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr)
- }
- return nil
- }
- func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
- Logger.Println("creating test topics")
- var testTopicNames []string
- for topic := range testTopicDetails {
- testTopicNames = append(testTopicNames, topic)
- }
- Logger.Println("Creating topics")
- config := NewConfig()
- config.Metadata.Retry.Max = 5
- config.Metadata.Retry.Backoff = 10 * time.Second
- config.ClientID = "sarama-tests"
- var err error
- config.Version, err = ParseKafkaVersion(env.KafkaVersion)
- if err != nil {
- return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
- }
- client, err := NewClient(env.KafkaBrokerAddrs, config)
- if err != nil {
- return fmt.Errorf("failed to connect to kafka: %w", err)
- }
- defer client.Close()
- controller, err := client.Controller()
- if err != nil {
- return fmt.Errorf("failed to connect to kafka controller: %w", err)
- }
- defer controller.Close()
- // Start by deleting the test topics (if they already exist)
- deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{
- Topics: testTopicNames,
- Timeout: 30 * time.Second,
- })
- if err != nil {
- return fmt.Errorf("failed to delete test topics: %w", err)
- }
- for topic, topicErr := range deleteRes.TopicErrorCodes {
- if !isTopicNotExistsErrorOrOk(topicErr) {
- return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr)
- }
- }
- // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
- // synchronously
- var topicsOk bool
- for i := 0; i < 20 && !topicsOk; i++ {
- time.Sleep(1 * time.Second)
- md, err := controller.GetMetadata(&MetadataRequest{
- Topics: testTopicNames,
- })
- if err != nil {
- return fmt.Errorf("failed to get metadata for test topics: %w", err)
- }
- topicsOk = true
- for _, topicsMd := range md.Topics {
- if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
- topicsOk = false
- }
- }
- }
- if !topicsOk {
- return fmt.Errorf("timed out waiting for test topics to be gone")
- }
- // now create the topics empty
- createRes, err := controller.CreateTopics(&CreateTopicsRequest{
- TopicDetails: testTopicDetails,
- Timeout: 30 * time.Second,
- })
- if err != nil {
- return fmt.Errorf("failed to create test topics: %w", err)
- }
- for topic, topicErr := range createRes.TopicErrors {
- if !isTopicExistsErrorOrOk(topicErr.Err) {
- return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr)
- }
- }
- // This is kind of gross, but we don't actually have support for doing transactional publishing
- // with sarama, so we need to use a java-based tool to publish uncomitted messages to
- // the uncommitted-topic-test-4 topic
- jarName := filepath.Base(uncomittedMsgJar)
- if _, err := os.Stat(jarName); err != nil {
- Logger.Printf("Downloading %s\n", uncomittedMsgJar)
- req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
- if err != nil {
- return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err)
- }
- res, err := http.DefaultClient.Do(req)
- if err != nil {
- return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
- }
- defer res.Body.Close()
- jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
- if err != nil {
- return fmt.Errorf("failed opening the uncomitted msg jar: %w", err)
- }
- defer jarFile.Close()
- _, err = io.Copy(jarFile, res.Body)
- if err != nil {
- return fmt.Errorf("failed writing the uncomitted msg jar: %w", err)
- }
- }
- c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
- c.Stdout = os.Stdout
- c.Stderr = os.Stderr
- err = c.Run()
- if err != nil {
- return fmt.Errorf("failed running uncomitted msg jar: %w", err)
- }
- return nil
- }
- func isTopicNotExistsErrorOrOk(err KError) bool {
- return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError
- }
- func isTopicExistsErrorOrOk(err KError) bool {
- return err == ErrTopicAlreadyExists || err == ErrNoError
- }
- func checkKafkaVersion(t testing.TB, requiredVersion string) {
- kafkaVersion := FunctionalTestEnv.KafkaVersion
- if kafkaVersion == "" {
- t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
- } else {
- available := parseKafkaVersion(kafkaVersion)
- required := parseKafkaVersion(requiredVersion)
- if !available.satisfies(required) {
- t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
- }
- }
- }
- func resetProxies(t testing.TB) {
- if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil {
- t.Error(err)
- }
- }
- func SaveProxy(t *testing.T, px string) {
- if err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
- t.Fatal(err)
- }
- }
- func setupFunctionalTest(t testing.TB) {
- resetProxies(t)
- }
- func teardownFunctionalTest(t testing.TB) {
- resetProxies(t)
- }
- type kafkaVersion []int
- func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
- var ov int
- for index, v := range kv {
- if len(other) <= index {
- ov = 0
- } else {
- ov = other[index]
- }
- if v < ov {
- return false
- } else if v > ov {
- return true
- }
- }
- return true
- }
- func parseKafkaVersion(version string) kafkaVersion {
- numbers := strings.Split(version, ".")
- result := make(kafkaVersion, 0, len(numbers))
- for _, number := range numbers {
- nr, _ := strconv.Atoi(number)
- result = append(result, nr)
- }
- return result
- }
|