Browse Source

functional test:
- rm manual test, add functional test
- add java producer to vagrant setup
- wget kafka from archive

FrancoisPoinsot 6 years ago
parent
commit
9722e739a4

+ 26 - 0
functional_consumer_test.go

@@ -8,6 +8,8 @@ import (
 	"sync"
 	"testing"
 	"time"
+
+	"github.com/stretchr/testify/require"
 )
 
 func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
@@ -117,6 +119,30 @@ func TestVersionMatrixIdempotent(t *testing.T) {
 	consumeMsgs(t, testVersions, producedMessages)
 }
 
+func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
+	checkKafkaVersion(t, "0.11.0")
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	config := NewConfig()
+	config.Consumer.IsolationLevel = ReadCommitted
+	config.Version = V0_11_0_0
+
+	consumer, err := NewConsumer(kafkaBrokers, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
+	require.NoError(t, err)
+
+	msgChannel := pc.Messages()
+	for i := 1; i <= 6; i++ {
+		msg := <-msgChannel
+		require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
+	}
+}
+
 func prodMsg2Str(prodMsg *ProducerMessage) string {
 	return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
 }

+ 2 - 0
go.mod

@@ -8,6 +8,8 @@ require (
 	github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
 	github.com/eapache/queue v1.1.0
 	github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
+	github.com/kisielk/errcheck v1.2.0 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
+	github.com/stretchr/testify v1.3.0
 )

+ 10 - 0
go.sum

@@ -2,6 +2,7 @@ github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
 github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
 github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
@@ -12,7 +13,16 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E=
+github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563 h1:NIou6eNFigscvKJmsbyez16S2cIS6idossORlFtSt2E=
+golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

+ 0 - 15
tools/uncomitted-messages/DockerFile_CustomerProducer.yaml

@@ -1,15 +0,0 @@
-FROM maven:3-jdk-8-slim
-
-RUN apt-get update && apt-get install -y git && \
-cd /opt &&  \
-git clone https://github.com/FrancoisPoinsot/simplest-uncommitted-msg &&  \
-cd ./simplest-uncommitted-msg &&  \
-mvn clean install
-
-  # There is still thing downloaded on `mvn exec`. I guess it download the `exec plugin`
-  # This still work, but it force the download at each run instead of using local docker cache of images layer
-  # TODO: add necessary config in pom.xml so that `mvn install` will install the `exec` plugin
-CMD  cd /opt/simplest-uncommitted-msg && mvn exec:java -Dexec.mainClass="CustomProducer.Main" && \
-echo ----------- && \
-echo ready to run && \
-echo -----------

+ 0 - 48
tools/uncomitted-messages/README.md

@@ -1,48 +0,0 @@
-# uncommitted-messages
-
-A setup using Java clients to test how to consomme uncommitted messages
-
-## Dependencies
-
-- docker
-- golang 
-
-## How to use it
-
-You need to add this line in your host file:
-`127.0.0.1	kafka`
-
-Run `docker-compose up` to load a kafka broker.
-Wait for "ready to run" message. There the brokers and messages are setup.
-
-Now you can tweak the lib and run this command as much as you want to see the output:
-`go run kafka-console-consumer.go --brokers kafka:9092 --topic topic-test`
-
-You should see something like
-```
-uncommitted
-uncommitted
-Committed 1
-Committed 2
-uncommitted
-uncommitted
-Committed 3
-Committed 4
-```
-
-or if you are consuming committed only:
-```
-Committed 1
-Committed 2
-Committed 3
-Committed 4
-```
-
-There is 5 topic `topic-test`,`topic-test-2`,`topic-test-5`,`topic-test-3`,`topic-test-4`.
-Each one, in their own way, are trying to mess with you !
-
-## Why add a line in host file?
-
-Because kafka has a `ADV_HOST` variable and he care a lot for that.
-Meaning you have to address him using the same host either the query come from your local computer or
-the inside of the docker-compose.

+ 0 - 25
tools/uncomitted-messages/docker-compose.yaml

@@ -1,25 +0,0 @@
-version: '3'
-services:
-  kafka:
-    image: landoop/fast-data-dev:1.1.1
-    ports:
-      - "2181:2181"
-      - "3030:3030"
-      - "8081-8083:8081-8083"
-      - "9581-9585:9581-9585"
-      - "9092:9092"
-    environment:
-      - ADV_HOST=kafka
-      - SAMPLEDATA=0
-      - RUNNING_SAMPLEDATA=0
-      - RUNTESTS=0
-      - FORWARDLOGS=0
-      - DISABLE_JMX=1
-      - DEBUG=1
-      - SUPERVISORWEB=0
-      - CONNECTORS=file
-
-  custom-producer:
-    build:
-      dockerfile: DockerFile_CustomerProducer.yaml
-      context: .

+ 0 - 79
tools/uncomitted-messages/kafka-console-consumer.go

@@ -1,79 +0,0 @@
-package main
-
-import (
-	"flag"
-	"log"
-	"os"
-	"strings"
-	"time"
-
-	// when testing locally, it is way faster to edit this to reference your local repo if different
-	// if you want to do it properly but slowly you may use some dependency manager magic
-	"github.com/FrancoisPoinsot/sarama"
-)
-
-var (
-	brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
-	topic      = flag.String("topic", "", "REQUIRED: the topic to consume")
-)
-
-func main() {
-	flag.Parse()
-
-	if *brokerList == "" {
-		log.Fatal("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
-	}
-	if *topic == "" {
-		log.Fatal("-topic is required")
-	}
-
-	config := sarama.NewConfig()
-	config.Version = sarama.V1_1_1_0
-	config.Consumer.IsolationLevel = sarama.ReadCommitted
-	config.Consumer.MaxProcessingTime = 20 * 365 * 24 * time.Hour
-
-	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
-	if err != nil {
-		log.Fatalf("Failed to start consumer: %s", err)
-	}
-
-	partitionList, err := c.Partitions(*topic)
-	if err != nil {
-		log.Fatalf("Failed to get the list of partitions: %s", err)
-	}
-
-	for _, partition := range partitionList {
-		pc, err := c.ConsumePartition(*topic, partition, sarama.OffsetOldest)
-		if err != nil {
-			log.Fatalf("Failed to start consumer for partition %d: %s", partition, err)
-		}
-
-		go func() {
-			for err := range pc.Errors() {
-				log.Fatal(err)
-			}
-		}()
-
-		msgChannel := pc.Messages()
-	read1Partition:
-		for {
-			//timeout := time.NewTimer(1 * time.Second)
-			select {
-			case msg, open := <-msgChannel:
-				if !open {
-					log.Println("channel message is closed")
-					break read1Partition
-				}
-				log.Println(string(msg.Value))
-				//case <-timeout.C:
-				//	break read1Partition
-			}
-		}
-	}
-
-	log.Println("Done consuming topic", *topic)
-
-	if err := c.Close(); err != nil {
-		log.Println("Failed to close consumer: ", err)
-	}
-}

+ 1 - 0
vagrant/create_topics.sh

@@ -6,3 +6,4 @@ cd ${KAFKA_INSTALL_ROOT}/kafka-9092
 bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic test.1 --zookeeper localhost:2181
 bin/kafka-topics.sh --create --partitions 4 --replication-factor 3 --topic test.4 --zookeeper localhost:2181
 bin/kafka-topics.sh --create --partitions 64 --replication-factor 3 --topic test.64  --zookeeper localhost:2181
+bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic uncommitted-topic-test-4  --zookeeper localhost:2181

+ 1 - 1
vagrant/install_cluster.sh

@@ -6,7 +6,7 @@ TOXIPROXY_VERSION=2.1.4
 
 mkdir -p ${KAFKA_INSTALL_ROOT}
 if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then
-    wget --quiet https://www-us.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
+    wget --quiet https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
 fi
 if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then
     wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION}

+ 1 - 0
vagrant/provision.sh

@@ -14,3 +14,4 @@ export REPOSITORY_ROOT=/vagrant
 sh /vagrant/vagrant/install_cluster.sh
 sh /vagrant/vagrant/setup_services.sh
 sh /vagrant/vagrant/create_topics.sh
+sh /vagrant/vagrant/run_java_producer.sh

+ 6 - 0
vagrant/run_java_producer.sh

@@ -0,0 +1,6 @@
+#!/bin/sh
+
+set -ex
+
+wget https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar
+java -jar simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -b ${KAFKA_HOSTNAME}:9092 -c 4