functional_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. //+build functional
  2. package sarama
  3. import (
  4. "context"
  5. "fmt"
  6. "io"
  7. "log"
  8. "net"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "os/exec"
  13. "path/filepath"
  14. "strconv"
  15. "strings"
  16. "testing"
  17. "time"
  18. toxiproxy "github.com/Shopify/toxiproxy/client"
  19. )
  20. const (
  21. uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
  22. )
  23. var (
  24. testTopicDetails = map[string]*TopicDetail{
  25. "test.1": {
  26. NumPartitions: 1,
  27. ReplicationFactor: 3,
  28. },
  29. "test.4": {
  30. NumPartitions: 4,
  31. ReplicationFactor: 3,
  32. },
  33. "test.64": {
  34. NumPartitions: 64,
  35. ReplicationFactor: 3,
  36. },
  37. "uncommitted-topic-test-4": {
  38. NumPartitions: 1,
  39. ReplicationFactor: 3,
  40. },
  41. }
  42. FunctionalTestEnv *testEnvironment
  43. )
  44. func TestMain(m *testing.M) {
  45. // Functional tests for Sarama
  46. //
  47. // You can either set TOXIPROXY_ADDR, which points at a toxiproxy address
  48. // already set up with 21801-21805 bound to zookeeper and 29091-29095
  49. // bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try
  50. // and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka
  51. // cluster, with toxiproxy configured as above.
  52. //
  53. // In either case, the following topics will be deleted (if they exist) and
  54. // then created/pre-seeded with data for the functional test run:
  55. // * uncomitted-topic-test-4
  56. // * test.1
  57. // * test.4
  58. // * test.64
  59. os.Exit(testMain(m))
  60. }
  61. func testMain(m *testing.M) int {
  62. ctx := context.Background()
  63. var env testEnvironment
  64. if os.Getenv("DEBUG") == "true" {
  65. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  66. }
  67. usingExisting, err := existingEnvironment(ctx, &env)
  68. if err != nil {
  69. panic(err)
  70. }
  71. if !usingExisting {
  72. err := prepareDockerTestEnvironment(ctx, &env)
  73. if err != nil {
  74. _ = tearDownDockerTestEnvironment(ctx, &env)
  75. panic(err)
  76. }
  77. defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck
  78. }
  79. if err := prepareTestTopics(ctx, &env); err != nil {
  80. panic(err)
  81. }
  82. FunctionalTestEnv = &env
  83. return m.Run()
  84. }
  85. type testEnvironment struct {
  86. ToxiproxyClient *toxiproxy.Client
  87. Proxies map[string]*toxiproxy.Proxy
  88. KafkaBrokerAddrs []string
  89. KafkaVersion string
  90. }
  91. func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
  92. Logger.Println("bringing up docker-based test environment")
  93. // Always (try to) tear down first.
  94. if err := tearDownDockerTestEnvironment(ctx, env); err != nil {
  95. return fmt.Errorf("failed to tear down existing env: %w", err)
  96. }
  97. if version, ok := os.LookupEnv("KAFKA_VERSION"); ok {
  98. env.KafkaVersion = version
  99. } else {
  100. // We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0.
  101. env.KafkaVersion = "2.5.0"
  102. }
  103. // the mapping of confluent platform docker image versions -> kafka versions can be
  104. // found here: https://docs.confluent.io/current/installation/versions-interoperability.html
  105. var confluentPlatformVersion string
  106. switch env.KafkaVersion {
  107. case "2.6.0":
  108. confluentPlatformVersion = "5.5.0"
  109. case "2.5.1":
  110. confluentPlatformVersion = "5.5.0"
  111. default:
  112. return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion)
  113. }
  114. c := exec.Command("docker-compose", "up", "-d")
  115. c.Stdout = os.Stdout
  116. c.Stderr = os.Stderr
  117. c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion))
  118. err := c.Run()
  119. if err != nil {
  120. return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err)
  121. }
  122. // Set up toxiproxy Proxies
  123. env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
  124. env.Proxies = map[string]*toxiproxy.Proxy{}
  125. for i := 1; i <= 5; i++ {
  126. proxyName := fmt.Sprintf("kafka%d", i)
  127. proxy, err := env.ToxiproxyClient.CreateProxy(
  128. proxyName,
  129. fmt.Sprintf("0.0.0.0:%d", 29090+i),
  130. fmt.Sprintf("kafka-%d:%d", i, 29090+i),
  131. )
  132. if err != nil {
  133. return fmt.Errorf("failed to create toxiproxy: %w", err)
  134. }
  135. env.Proxies[proxyName] = proxy
  136. env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
  137. }
  138. // Wait for the kafka broker to come up
  139. allBrokersUp := false
  140. for i := 0; i < 45 && !allBrokersUp; i++ {
  141. Logger.Println("waiting for kafka brokers to come up")
  142. time.Sleep(1 * time.Second)
  143. config := NewConfig()
  144. config.Version, err = ParseKafkaVersion(env.KafkaVersion)
  145. if err != nil {
  146. return err
  147. }
  148. config.Net.DialTimeout = 1 * time.Second
  149. config.Net.ReadTimeout = 1 * time.Second
  150. config.Net.WriteTimeout = 1 * time.Second
  151. config.ClientID = "sarama-tests"
  152. brokersOk := make([]bool, len(env.KafkaBrokerAddrs))
  153. retryLoop:
  154. for j, addr := range env.KafkaBrokerAddrs {
  155. client, err := NewClient([]string{addr}, config)
  156. if err != nil {
  157. continue
  158. }
  159. err = client.RefreshMetadata()
  160. if err != nil {
  161. continue
  162. }
  163. brokers := client.Brokers()
  164. if len(brokers) < 5 {
  165. continue
  166. }
  167. for _, broker := range brokers {
  168. err := broker.Open(client.Config())
  169. if err != nil {
  170. continue retryLoop
  171. }
  172. connected, err := broker.Connected()
  173. if err != nil || !connected {
  174. continue retryLoop
  175. }
  176. }
  177. brokersOk[j] = true
  178. }
  179. allBrokersUp = true
  180. for _, u := range brokersOk {
  181. allBrokersUp = allBrokersUp && u
  182. }
  183. }
  184. if !allBrokersUp {
  185. return fmt.Errorf("timed out waiting for broker to come up")
  186. }
  187. return nil
  188. }
  189. func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) {
  190. toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR")
  191. if !ok {
  192. return false, nil
  193. }
  194. toxiproxyURL, err := url.Parse(toxiproxyAddr)
  195. if err != nil {
  196. return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
  197. }
  198. toxiproxyHost := toxiproxyURL.Hostname()
  199. env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
  200. for i := 1; i <= 5; i++ {
  201. proxyName := fmt.Sprintf("kafka%d", i)
  202. proxy, err := env.ToxiproxyClient.Proxy(proxyName)
  203. if err != nil {
  204. return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
  205. }
  206. env.Proxies[proxyName] = proxy
  207. // get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
  208. _, proxyPort, err := net.SplitHostPort(proxy.Listen)
  209. if err != nil {
  210. return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
  211. }
  212. env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort))
  213. }
  214. env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
  215. if !ok {
  216. return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR")
  217. }
  218. return true, nil
  219. }
  220. func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
  221. c := exec.Command("docker-compose", "down", "--volumes")
  222. c.Stdout = os.Stdout
  223. c.Stderr = os.Stderr
  224. downErr := c.Run()
  225. c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop")
  226. c.Stdout = os.Stdout
  227. c.Stderr = os.Stderr
  228. rmErr := c.Run()
  229. if downErr != nil {
  230. return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr)
  231. }
  232. if rmErr != nil {
  233. return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr)
  234. }
  235. return nil
  236. }
  237. func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
  238. Logger.Println("creating test topics")
  239. var testTopicNames []string
  240. for topic := range testTopicDetails {
  241. testTopicNames = append(testTopicNames, topic)
  242. }
  243. Logger.Println("Creating topics")
  244. config := NewConfig()
  245. config.Metadata.Retry.Max = 5
  246. config.Metadata.Retry.Backoff = 10 * time.Second
  247. config.ClientID = "sarama-tests"
  248. var err error
  249. config.Version, err = ParseKafkaVersion(env.KafkaVersion)
  250. if err != nil {
  251. return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
  252. }
  253. client, err := NewClient(env.KafkaBrokerAddrs, config)
  254. if err != nil {
  255. return fmt.Errorf("failed to connect to kafka: %w", err)
  256. }
  257. defer client.Close()
  258. controller, err := client.Controller()
  259. if err != nil {
  260. return fmt.Errorf("failed to connect to kafka controller: %w", err)
  261. }
  262. defer controller.Close()
  263. // Start by deleting the test topics (if they already exist)
  264. deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{
  265. Topics: testTopicNames,
  266. Timeout: 30 * time.Second,
  267. })
  268. if err != nil {
  269. return fmt.Errorf("failed to delete test topics: %w", err)
  270. }
  271. for topic, topicErr := range deleteRes.TopicErrorCodes {
  272. if !isTopicNotExistsErrorOrOk(topicErr) {
  273. return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr)
  274. }
  275. }
  276. // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
  277. // synchronously
  278. var topicsOk bool
  279. for i := 0; i < 20 && !topicsOk; i++ {
  280. time.Sleep(1 * time.Second)
  281. md, err := controller.GetMetadata(&MetadataRequest{
  282. Topics: testTopicNames,
  283. })
  284. if err != nil {
  285. return fmt.Errorf("failed to get metadata for test topics: %w", err)
  286. }
  287. topicsOk = true
  288. for _, topicsMd := range md.Topics {
  289. if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
  290. topicsOk = false
  291. }
  292. }
  293. }
  294. if !topicsOk {
  295. return fmt.Errorf("timed out waiting for test topics to be gone")
  296. }
  297. // now create the topics empty
  298. createRes, err := controller.CreateTopics(&CreateTopicsRequest{
  299. TopicDetails: testTopicDetails,
  300. Timeout: 30 * time.Second,
  301. })
  302. if err != nil {
  303. return fmt.Errorf("failed to create test topics: %w", err)
  304. }
  305. for topic, topicErr := range createRes.TopicErrors {
  306. if !isTopicExistsErrorOrOk(topicErr.Err) {
  307. return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr)
  308. }
  309. }
  310. // This is kind of gross, but we don't actually have support for doing transactional publishing
  311. // with sarama, so we need to use a java-based tool to publish uncomitted messages to
  312. // the uncommitted-topic-test-4 topic
  313. jarName := filepath.Base(uncomittedMsgJar)
  314. if _, err := os.Stat(jarName); err != nil {
  315. Logger.Printf("Downloading %s\n", uncomittedMsgJar)
  316. req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
  317. if err != nil {
  318. return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err)
  319. }
  320. res, err := http.DefaultClient.Do(req)
  321. if err != nil {
  322. return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
  323. }
  324. defer res.Body.Close()
  325. jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
  326. if err != nil {
  327. return fmt.Errorf("failed opening the uncomitted msg jar: %w", err)
  328. }
  329. defer jarFile.Close()
  330. _, err = io.Copy(jarFile, res.Body)
  331. if err != nil {
  332. return fmt.Errorf("failed writing the uncomitted msg jar: %w", err)
  333. }
  334. }
  335. c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
  336. c.Stdout = os.Stdout
  337. c.Stderr = os.Stderr
  338. err = c.Run()
  339. if err != nil {
  340. return fmt.Errorf("failed running uncomitted msg jar: %w", err)
  341. }
  342. return nil
  343. }
  344. func isTopicNotExistsErrorOrOk(err KError) bool {
  345. return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError
  346. }
  347. func isTopicExistsErrorOrOk(err KError) bool {
  348. return err == ErrTopicAlreadyExists || err == ErrNoError
  349. }
  350. func checkKafkaVersion(t testing.TB, requiredVersion string) {
  351. kafkaVersion := FunctionalTestEnv.KafkaVersion
  352. if kafkaVersion == "" {
  353. t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
  354. } else {
  355. available := parseKafkaVersion(kafkaVersion)
  356. required := parseKafkaVersion(requiredVersion)
  357. if !available.satisfies(required) {
  358. t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
  359. }
  360. }
  361. }
  362. func resetProxies(t testing.TB) {
  363. if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil {
  364. t.Error(err)
  365. }
  366. }
  367. func SaveProxy(t *testing.T, px string) {
  368. if err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
  369. t.Fatal(err)
  370. }
  371. }
  372. func setupFunctionalTest(t testing.TB) {
  373. resetProxies(t)
  374. }
  375. func teardownFunctionalTest(t testing.TB) {
  376. resetProxies(t)
  377. }
  378. type kafkaVersion []int
  379. func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
  380. var ov int
  381. for index, v := range kv {
  382. if len(other) <= index {
  383. ov = 0
  384. } else {
  385. ov = other[index]
  386. }
  387. if v < ov {
  388. return false
  389. } else if v > ov {
  390. return true
  391. }
  392. }
  393. return true
  394. }
  395. func parseKafkaVersion(version string) kafkaVersion {
  396. numbers := strings.Split(version, ".")
  397. result := make(kafkaVersion, 0, len(numbers))
  398. for _, number := range numbers {
  399. nr, _ := strconv.Atoi(number)
  400. result = append(result, nr)
  401. }
  402. return result
  403. }