//+build functional

package sarama

import (
	"context"
	"fmt"
	"io"
	"log"
	"net"
	"net/http"
	"net/url"
	"os"
	"os/exec"
	"path/filepath"
	"strconv"
	"strings"
	"testing"
	"time"

	toxiproxy "github.com/Shopify/toxiproxy/client"
)

const (
	uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
)

var (
	testTopicDetails = map[string]*TopicDetail{
		"test.1": {
			NumPartitions:     1,
			ReplicationFactor: 3,
		},
		"test.4": {
			NumPartitions:     4,
			ReplicationFactor: 3,
		},
		"test.64": {
			NumPartitions:     64,
			ReplicationFactor: 3,
		},
		"uncommitted-topic-test-4": {
			NumPartitions:     1,
			ReplicationFactor: 3,
		},
	}

	FunctionalTestEnv *testEnvironment
)

func TestMain(m *testing.M) {
	// Functional tests for Sarama
	//
	// You can either set TOXIPROXY_ADDR, which points at a toxiproxy address
	// already set up with 21801-21805 bound to zookeeper and 29091-29095
	// bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try
	// and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka
	// cluster, with toxiproxy configured as above.
	//
	// In either case, the following topics will be deleted (if they exist) and
	// then created/pre-seeded with data for the functional test run:
	//     * uncomitted-topic-test-4
	//     * test.1
	//     * test.4
	//     * test.64
	os.Exit(testMain(m))
}

func testMain(m *testing.M) int {
	ctx := context.Background()
	var env testEnvironment

	if os.Getenv("DEBUG") == "true" {
		Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
	}

	usingExisting, err := existingEnvironment(ctx, &env)
	if err != nil {
		panic(err)
	}
	if !usingExisting {
		err := prepareDockerTestEnvironment(ctx, &env)
		if err != nil {
			_ = tearDownDockerTestEnvironment(ctx, &env)
			panic(err)
		}
		defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck
	}
	if err := prepareTestTopics(ctx, &env); err != nil {
		panic(err)
	}
	FunctionalTestEnv = &env
	return m.Run()
}

type testEnvironment struct {
	ToxiproxyClient  *toxiproxy.Client
	Proxies          map[string]*toxiproxy.Proxy
	KafkaBrokerAddrs []string
	KafkaVersion     string
}

func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
	Logger.Println("bringing up docker-based test environment")

	// Always (try to) tear down first.
	if err := tearDownDockerTestEnvironment(ctx, env); err != nil {
		return fmt.Errorf("failed to tear down existing env: %w", err)
	}

	if version, ok := os.LookupEnv("KAFKA_VERSION"); ok {
		env.KafkaVersion = version
	} else {
		// We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0.
		env.KafkaVersion = "2.5.0"
	}

	// the mapping of confluent platform docker image versions -> kafka versions can be
	// found here: https://docs.confluent.io/current/installation/versions-interoperability.html
	var confluentPlatformVersion string
	switch env.KafkaVersion {
	case "2.5.0":
		confluentPlatformVersion = "5.5.0"
	case "2.4.1":
		confluentPlatformVersion = "5.4.2"
	default:
		return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion)
	}

	c := exec.Command("docker-compose", "up", "-d")
	c.Stdout = os.Stdout
	c.Stderr = os.Stderr
	c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion))
	err := c.Run()
	if err != nil {
		return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err)
	}

	// Set up toxiproxy Proxies
	env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
	env.Proxies = map[string]*toxiproxy.Proxy{}
	for i := 1; i <= 5; i++ {
		proxyName := fmt.Sprintf("kafka%d", i)
		proxy, err := env.ToxiproxyClient.CreateProxy(
			proxyName,
			fmt.Sprintf("0.0.0.0:%d", 29090+i),
			fmt.Sprintf("kafka-%d:%d", i, 29090+i),
		)
		if err != nil {
			return fmt.Errorf("failed to create toxiproxy: %w", err)
		}
		env.Proxies[proxyName] = proxy
		env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
	}

	// Wait for the kafka broker to come up
	allBrokersUp := false
	for i := 0; i < 45 && !allBrokersUp; i++ {
		Logger.Println("waiting for kafka brokers to come up")
		time.Sleep(1 * time.Second)
		config := NewConfig()
		config.Version, err = ParseKafkaVersion(env.KafkaVersion)
		if err != nil {
			return err
		}
		config.Net.DialTimeout = 1 * time.Second
		config.Net.ReadTimeout = 1 * time.Second
		config.Net.WriteTimeout = 1 * time.Second
		config.ClientID = "sarama-tests"
		brokersOk := make([]bool, len(env.KafkaBrokerAddrs))
	retryLoop:
		for j, addr := range env.KafkaBrokerAddrs {
			client, err := NewClient([]string{addr}, config)
			if err != nil {
				continue
			}
			err = client.RefreshMetadata()
			if err != nil {
				continue
			}
			brokers := client.Brokers()
			if len(brokers) < 5 {
				continue
			}
			for _, broker := range brokers {
				err := broker.Open(client.Config())
				if err != nil {
					continue retryLoop
				}
				connected, err := broker.Connected()
				if err != nil || !connected {
					continue retryLoop
				}
			}
			brokersOk[j] = true
		}
		allBrokersUp = true
		for _, u := range brokersOk {
			allBrokersUp = allBrokersUp && u
		}
	}
	if !allBrokersUp {
		return fmt.Errorf("timed out waiting for broker to come up")
	}

	return nil
}

func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) {
	toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR")
	if !ok {
		return false, nil
	}
	toxiproxyURL, err := url.Parse(toxiproxyAddr)
	if err != nil {
		return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
	}
	toxiproxyHost := toxiproxyURL.Hostname()

	env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
	for i := 1; i <= 5; i++ {
		proxyName := fmt.Sprintf("kafka%d", i)
		proxy, err := env.ToxiproxyClient.Proxy(proxyName)
		if err != nil {
			return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
		}
		env.Proxies[proxyName] = proxy
		// get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
		_, proxyPort, err := net.SplitHostPort(proxy.Listen)
		if err != nil {
			return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
		}
		env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort))
	}

	env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
	if !ok {
		return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR")
	}
	return true, nil
}

func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
	c := exec.Command("docker-compose", "down", "--volumes")
	c.Stdout = os.Stdout
	c.Stderr = os.Stderr
	downErr := c.Run()

	c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop")
	c.Stdout = os.Stdout
	c.Stderr = os.Stderr
	rmErr := c.Run()
	if downErr != nil {
		return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr)
	}
	if rmErr != nil {
		return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr)
	}
	return nil
}

func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
	Logger.Println("creating test topics")
	var testTopicNames []string
	for topic := range testTopicDetails {
		testTopicNames = append(testTopicNames, topic)
	}

	Logger.Println("Creating topics")
	config := NewConfig()
	config.Metadata.Retry.Max = 5
	config.Metadata.Retry.Backoff = 10 * time.Second
	config.ClientID = "sarama-tests"
	var err error
	config.Version, err = ParseKafkaVersion(env.KafkaVersion)
	if err != nil {
		return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
	}

	client, err := NewClient(env.KafkaBrokerAddrs, config)
	if err != nil {
		return fmt.Errorf("failed to connect to kafka: %w", err)
	}
	defer client.Close()

	controller, err := client.Controller()
	if err != nil {
		return fmt.Errorf("failed to connect to kafka controller: %w", err)
	}
	defer controller.Close()

	// Start by deleting the test topics (if they already exist)
	deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{
		Topics:  testTopicNames,
		Timeout: 30 * time.Second,
	})
	if err != nil {
		return fmt.Errorf("failed to delete test topics: %w", err)
	}
	for topic, topicErr := range deleteRes.TopicErrorCodes {
		if !isTopicNotExistsErrorOrOk(topicErr) {
			return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr)
		}
	}

	// wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
	// synchronously
	var topicsOk bool
	for i := 0; i < 20 && !topicsOk; i++ {
		time.Sleep(1 * time.Second)
		md, err := controller.GetMetadata(&MetadataRequest{
			Topics: testTopicNames,
		})
		if err != nil {
			return fmt.Errorf("failed to get metadata for test topics: %w", err)
		}

		topicsOk = true
		for _, topicsMd := range md.Topics {
			if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
				topicsOk = false
			}
		}
	}
	if !topicsOk {
		return fmt.Errorf("timed out waiting for test topics to be gone")
	}

	// now create the topics empty
	createRes, err := controller.CreateTopics(&CreateTopicsRequest{
		TopicDetails: testTopicDetails,
		Timeout:      30 * time.Second,
	})
	if err != nil {
		return fmt.Errorf("failed to create test topics: %w", err)
	}
	for topic, topicErr := range createRes.TopicErrors {
		if !isTopicExistsErrorOrOk(topicErr.Err) {
			return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr)
		}
	}

	// This is kind of gross, but we don't actually have support for doing transactional publishing
	// with sarama, so we need to use a java-based tool to publish uncomitted messages to
	// the uncommitted-topic-test-4 topic
	jarName := filepath.Base(uncomittedMsgJar)
	if _, err := os.Stat(jarName); err != nil {
		Logger.Printf("Downloading %s\n", uncomittedMsgJar)
		req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
		if err != nil {
			return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err)
		}
		res, err := http.DefaultClient.Do(req)
		if err != nil {
			return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
		}
		defer res.Body.Close()
		jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
		if err != nil {
			return fmt.Errorf("failed opening the uncomitted msg jar: %w", err)
		}
		defer jarFile.Close()

		_, err = io.Copy(jarFile, res.Body)
		if err != nil {
			return fmt.Errorf("failed writing the uncomitted msg jar: %w", err)
		}
	}

	c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
	c.Stdout = os.Stdout
	c.Stderr = os.Stderr
	err = c.Run()
	if err != nil {
		return fmt.Errorf("failed running uncomitted msg jar: %w", err)
	}
	return nil
}

func isTopicNotExistsErrorOrOk(err KError) bool {
	return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError
}

func isTopicExistsErrorOrOk(err KError) bool {
	return err == ErrTopicAlreadyExists || err == ErrNoError
}

func checkKafkaVersion(t testing.TB, requiredVersion string) {
	kafkaVersion := FunctionalTestEnv.KafkaVersion
	if kafkaVersion == "" {
		t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
	} else {
		available := parseKafkaVersion(kafkaVersion)
		required := parseKafkaVersion(requiredVersion)
		if !available.satisfies(required) {
			t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
		}
	}
}

func resetProxies(t testing.TB) {
	if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil {
		t.Error(err)
	}
}

func SaveProxy(t *testing.T, px string) {
	if err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
		t.Fatal(err)
	}
}

func setupFunctionalTest(t testing.TB) {
	resetProxies(t)
}

func teardownFunctionalTest(t testing.TB) {
	resetProxies(t)
}

type kafkaVersion []int

func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
	var ov int
	for index, v := range kv {
		if len(other) <= index {
			ov = 0
		} else {
			ov = other[index]
		}

		if v < ov {
			return false
		} else if v > ov {
			return true
		}
	}
	return true
}

func parseKafkaVersion(version string) kafkaVersion {
	numbers := strings.Split(version, ".")
	result := make(kafkaVersion, 0, len(numbers))
	for _, number := range numbers {
		nr, _ := strconv.Atoi(number)
		result = append(result, nr)
	}

	return result
}