functional_test.go 12 KB


  1. //+build functional
  2. package sarama
  3. import (
  4. "context"
  5. "fmt"
  6. "io"
  7. "log"
  8. "net"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "os/exec"
  13. "path/filepath"
  14. "strconv"
  15. "strings"
  16. "testing"
  17. "time"
  18. toxiproxy "github.com/Shopify/toxiproxy/client"
  19. )
  20. const (
  21. uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
  22. )
  23. var (
  24. testTopicDetails = map[string]*TopicDetail{
  25. "test.1": {
  26. NumPartitions: 1,
  27. ReplicationFactor: 3,
  28. },
  29. "test.4": {
  30. NumPartitions: 4,
  31. ReplicationFactor: 3,
  32. },
  33. "test.64": {
  34. NumPartitions: 64,
  35. ReplicationFactor: 3,
  36. },
  37. "uncommitted-topic-test-4": {
  38. NumPartitions: 1,
  39. ReplicationFactor: 3,
  40. },
  41. }
  42. FunctionalTestEnv *testEnvironment
  43. )
  44. func TestMain(m *testing.M) {
  45. // Functional tests for Sarama
  46. //
  47. // You can either set TOXIPROXY_ADDR, which points at a toxiproxy address
  48. // already set up with 21801-21805 bound to zookeeper and 29091-29095
  49. // bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try
  50. // and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka
  51. // cluster, with toxiproxy configured as above.
  52. //
  53. // In either case, the following topics will be deleted (if they exist) and
  54. // then created/pre-seeded with data for the functional test run:
  55. // * uncomitted-topic-test-4
  56. // * test.1
  57. // * test.4
  58. // * test.64
  59. os.Exit(testMain(m))
  60. }
  61. func testMain(m *testing.M) int {
  62. ctx := context.Background()
  63. var env testEnvironment
  64. if os.Getenv("DEBUG") == "true" {
  65. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  66. }
  67. usingExisting, err := existingEnvironment(ctx, &env)
  68. if err != nil {
  69. panic(err)
  70. }
  71. if !usingExisting {
  72. err := prepareDockerTestEnvironment(ctx, &env)
  73. if err != nil {
  74. _ = tearDownDockerTestEnvironment(ctx, &env)
  75. panic(err)
  76. }
  77. defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck
  78. }
  79. if err := prepareTestTopics(ctx, &env); err != nil {
  80. panic(err)
  81. }
  82. FunctionalTestEnv = &env
  83. return m.Run()
  84. }
  85. type testEnvironment struct {
  86. ToxiproxyClient *toxiproxy.Client
  87. Proxies map[string]*toxiproxy.Proxy
  88. KafkaBrokerAddrs []string
  89. KafkaVersion string
  90. }
  91. func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
  92. Logger.Println("bringing up docker-based test environment")
  93. // Always (try to) tear down first.
  94. if err := tearDownDockerTestEnvironment(ctx, env); err != nil {
  95. return fmt.Errorf("failed to tear down existing env: %w", err)
  96. }
  97. if version, ok := os.LookupEnv("KAFKA_VERSION"); ok {
  98. env.KafkaVersion = version
  99. } else {
  100. // We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0.
  101. env.KafkaVersion = "2.5.0"
  102. }
  103. // the mapping of confluent platform docker image versions -> kafka versions can be
  104. // found here: https://docs.confluent.io/current/installation/versions-interoperability.html
  105. var confluentPlatformVersion string
  106. switch env.KafkaVersion {
  107. case "2.5.0":
  108. confluentPlatformVersion = "5.5.0"
  109. case "2.4.1":
  110. confluentPlatformVersion = "5.4.2"
  111. default:
  112. return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion)
  113. }
  114. c := exec.Command("docker-compose", "up", "-d")
  115. c.Stdout = os.Stdout
  116. c.Stderr = os.Stderr
  117. c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion))
  118. err := c.Run()
  119. if err != nil {
  120. return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err)
  121. }
  122. // Set up toxiproxy Proxies
  123. env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
  124. env.Proxies = map[string]*toxiproxy.Proxy{}
  125. for i := 1; i <= 5; i++ {
  126. proxyName := fmt.Sprintf("kafka%d", i)
  127. proxy, err := env.ToxiproxyClient.CreateProxy(
  128. proxyName,
  129. fmt.Sprintf("0.0.0.0:%d", 29090+i),
  130. fmt.Sprintf("kafka-%d:%d", i, 29090+i),
  131. )
  132. if err != nil {
  133. return fmt.Errorf("failed to create toxiproxy: %w", err)
  134. }
  135. env.Proxies[proxyName] = proxy
  136. env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
  137. }
  138. // the mapping of confluent platform docker image vesions -> kafka versions can be
  139. // found here: https://docs.confluent.io/current/installation/versions-interoperability.html
  140. // We have cp-5.5.0 in the docker-compose file, so that's kafka 2.5.0.
  141. env.KafkaVersion = "2.5.0"
  142. // Wait for the kafka broker to come up
  143. allBrokersUp := false
  144. for i := 0; i < 45 && !allBrokersUp; i++ {
  145. Logger.Println("waiting for kafka brokers to come up")
  146. time.Sleep(1 * time.Second)
  147. config := NewConfig()
  148. config.Version, err = ParseKafkaVersion(env.KafkaVersion)
  149. if err != nil {
  150. return err
  151. }
  152. config.Net.DialTimeout = 1 * time.Second
  153. config.Net.ReadTimeout = 1 * time.Second
  154. config.Net.WriteTimeout = 1 * time.Second
  155. config.ClientID = "sarama-tests"
  156. brokersOk := make([]bool, len(env.KafkaBrokerAddrs))
  157. retryLoop:
  158. for j, addr := range env.KafkaBrokerAddrs {
  159. client, err := NewClient([]string{addr}, config)
  160. if err != nil {
  161. continue
  162. }
  163. err = client.RefreshMetadata()
  164. if err != nil {
  165. continue
  166. }
  167. brokers := client.Brokers()
  168. if len(brokers) < 5 {
  169. continue
  170. }
  171. for _, broker := range brokers {
  172. err := broker.Open(client.Config())
  173. if err != nil {
  174. continue retryLoop
  175. }
  176. connected, err := broker.Connected()
  177. if err != nil || !connected {
  178. continue retryLoop
  179. }
  180. }
  181. brokersOk[j] = true
  182. }
  183. allBrokersUp = true
  184. for _, u := range brokersOk {
  185. allBrokersUp = allBrokersUp && u
  186. }
  187. }
  188. if !allBrokersUp {
  189. return fmt.Errorf("timed out waiting for broker to come up")
  190. }
  191. return nil
  192. }
  193. func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) {
  194. toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR")
  195. if !ok {
  196. return false, nil
  197. }
  198. toxiproxyURL, err := url.Parse(toxiproxyAddr)
  199. if err != nil {
  200. return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
  201. }
  202. toxiproxyHost := toxiproxyURL.Hostname()
  203. env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
  204. for i := 1; i <= 5; i++ {
  205. proxyName := fmt.Sprintf("kafka%d", i)
  206. proxy, err := env.ToxiproxyClient.Proxy(proxyName)
  207. if err != nil {
  208. return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
  209. }
  210. env.Proxies[proxyName] = proxy
  211. // get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
  212. _, proxyPort, err := net.SplitHostPort(proxy.Listen)
  213. if err != nil {
  214. return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
  215. }
  216. env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort))
  217. }
  218. env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
  219. if !ok {
  220. return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR")
  221. }
  222. return true, nil
  223. }
  224. func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
  225. c := exec.Command("docker-compose", "down", "--volumes")
  226. c.Stdout = os.Stdout
  227. c.Stderr = os.Stderr
  228. downErr := c.Run()
  229. c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop")
  230. c.Stdout = os.Stdout
  231. c.Stderr = os.Stderr
  232. rmErr := c.Run()
  233. if downErr != nil {
  234. return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr)
  235. }
  236. if rmErr != nil {
  237. return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr)
  238. }
  239. return nil
  240. }
  241. func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
  242. Logger.Println("creating test topics")
  243. var testTopicNames []string
  244. for topic := range testTopicDetails {
  245. testTopicNames = append(testTopicNames, topic)
  246. }
  247. Logger.Println("Creating topics")
  248. config := NewConfig()
  249. config.Metadata.Retry.Max = 5
  250. config.Metadata.Retry.Backoff = 10 * time.Second
  251. config.ClientID = "sarama-tests"
  252. var err error
  253. config.Version, err = ParseKafkaVersion(env.KafkaVersion)
  254. if err != nil {
  255. return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
  256. }
  257. client, err := NewClient(env.KafkaBrokerAddrs, config)
  258. if err != nil {
  259. return fmt.Errorf("failed to connect to kafka: %w", err)
  260. }
  261. defer client.Close()
  262. controller, err := client.Controller()
  263. if err != nil {
  264. return fmt.Errorf("failed to connect to kafka controller: %w", err)
  265. }
  266. defer controller.Close()
  267. // Start by deleting the test topics (if they already exist)
  268. deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{
  269. Topics: testTopicNames,
  270. Timeout: 30 * time.Second,
  271. })
  272. if err != nil {
  273. return fmt.Errorf("failed to delete test topics: %w", err)
  274. }
  275. for topic, topicErr := range deleteRes.TopicErrorCodes {
  276. if !isTopicNotExistsErrorOrOk(topicErr) {
  277. return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr)
  278. }
  279. }
  280. // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
  281. // synchronously
  282. var topicsOk bool
  283. for i := 0; i < 20 && !topicsOk; i++ {
  284. time.Sleep(1 * time.Second)
  285. md, err := controller.GetMetadata(&MetadataRequest{
  286. Topics: testTopicNames,
  287. })
  288. if err != nil {
  289. return fmt.Errorf("failed to get metadata for test topics: %w", err)
  290. }
  291. topicsOk = true
  292. for _, topicsMd := range md.Topics {
  293. if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
  294. topicsOk = false
  295. }
  296. }
  297. }
  298. if !topicsOk {
  299. return fmt.Errorf("timed out waiting for test topics to be gone")
  300. }
  301. // now create the topics empty
  302. createRes, err := controller.CreateTopics(&CreateTopicsRequest{
  303. TopicDetails: testTopicDetails,
  304. Timeout: 30 * time.Second,
  305. })
  306. if err != nil {
  307. return fmt.Errorf("failed to create test topics: %w", err)
  308. }
  309. for topic, topicErr := range createRes.TopicErrors {
  310. if !isTopicExistsErrorOrOk(topicErr.Err) {
  311. return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr)
  312. }
  313. }
  314. // This is kind of gross, but we don't actually have support for doing transactional publishing
  315. // with sarama, so we need to use a java-based tool to publish uncomitted messages to
  316. // the uncommitted-topic-test-4 topic
  317. jarName := filepath.Base(uncomittedMsgJar)
  318. if _, err := os.Stat(jarName); err != nil {
  319. Logger.Printf("Downloading %s\n", uncomittedMsgJar)
  320. req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
  321. if err != nil {
  322. return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err)
  323. }
  324. res, err := http.DefaultClient.Do(req)
  325. if err != nil {
  326. return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
  327. }
  328. defer res.Body.Close()
  329. jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
  330. if err != nil {
  331. return fmt.Errorf("failed opening the uncomitted msg jar: %w", err)
  332. }
  333. defer jarFile.Close()
  334. _, err = io.Copy(jarFile, res.Body)
  335. if err != nil {
  336. return fmt.Errorf("failed writing the uncomitted msg jar: %w", err)
  337. }
  338. }
  339. c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
  340. c.Stdout = os.Stdout
  341. c.Stderr = os.Stderr
  342. err = c.Run()
  343. if err != nil {
  344. return fmt.Errorf("failed running uncomitted msg jar: %w", err)
  345. }
  346. return nil
  347. }
  348. func isTopicNotExistsErrorOrOk(err KError) bool {
  349. return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError
  350. }
  351. func isTopicExistsErrorOrOk(err KError) bool {
  352. return err == ErrTopicAlreadyExists || err == ErrNoError
  353. }
  354. func checkKafkaVersion(t testing.TB, requiredVersion string) {
  355. kafkaVersion := FunctionalTestEnv.KafkaVersion
  356. if kafkaVersion == "" {
  357. t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
  358. } else {
  359. available := parseKafkaVersion(kafkaVersion)
  360. required := parseKafkaVersion(requiredVersion)
  361. if !available.satisfies(required) {
  362. t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
  363. }
  364. }
  365. }
  366. func resetProxies(t testing.TB) {
  367. if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil {
  368. t.Error(err)
  369. }
  370. }
  371. func SaveProxy(t *testing.T, px string) {
  372. if err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
  373. t.Fatal(err)
  374. }
  375. }
  376. func setupFunctionalTest(t testing.TB) {
  377. resetProxies(t)
  378. }
  379. func teardownFunctionalTest(t testing.TB) {
  380. resetProxies(t)
  381. }
  382. type kafkaVersion []int
  383. func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
  384. var ov int
  385. for index, v := range kv {
  386. if len(other) <= index {
  387. ov = 0
  388. } else {
  389. ov = other[index]
  390. }
  391. if v < ov {
  392. return false
  393. } else if v > ov {
  394. return true
  395. }
  396. }
  397. return true
  398. }
  399. func parseKafkaVersion(version string) kafkaVersion {
  400. numbers := strings.Split(version, ".")
  401. result := make(kafkaVersion, 0, len(numbers))
  402. for _, number := range numbers {
  403. nr, _ := strconv.Atoi(number)
  404. result = append(result, nr)
  405. }
  406. return result
  407. }