Browse Source

Merge pull request #107 from Shopify/functional_test

Add functional test
Willem van Bergen 11 years ago
parent
commit
9b9e8e36a3
3 changed files with 62 additions and 2 deletions
  1. 6 1
      .travis.yml
  2. 1 1
      README.md
  3. 55 0
      functional_test.go

+ 6 - 1
.travis.yml

@@ -2,9 +2,14 @@ language: go
 go:
 - 1.1
 - 1.2
+- 1.3
 
 before_install:
-- sudo apt-get install zookeeper 2>&1
 - wget http://apache.mirror.nexicom.net/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz -O kafka.tgz
 - mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
+- nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &"
+- sleep 5
 - nohup bash -c "cd kafka && bin/kafka-server-start.sh config/server.properties &"
+- sleep 5
+- kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic single_partition --zookeeper localhost:2181
+- kafka/bin/kafka-topics.sh --create --partitions 2 --replication-factor 1 --topic multi_partition --zookeeper localhost:2181

+ 1 - 1
README.md

@@ -9,7 +9,7 @@ There is a google group for discussion
 * web: https://groups.google.com/forum/#!forum/sarama-users
 * email: sarama-users@googlegroups.com
 
-It is compatible with Go 1.1 and 1.2 (which means `go vet` on 1.2 may return
+It is compatible with Go 1.1, 1.2, and 1.3 (which means `go vet` on 1.2 or 1.3 may return
 some suggestions that we are ignoring for the sake of compatibility with 1.1).
 
 A word of warning: the API is not 100% stable yet. It won't change much (in particular the low-level

+ 55 - 0
functional_test.go

@@ -0,0 +1,55 @@
+package sarama
+
+import (
+	"fmt"
+	"testing"
+	"time"
+)
+
+const (
+	TestBatchSize = 1000
+)
+
+func TestProducingMessages(t *testing.T) {
+	client, err := NewClient("functional_test", []string{"localhost:9092"}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	consumerConfig := NewConsumerConfig()
+	consumerConfig.OffsetMethod = OffsetMethodNewest
+
+	consumer, err := NewConsumer(client, "single_partition", 0, "functional_test", consumerConfig)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	producer, err := NewProducer(client, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+
+	for i := 1; i <= TestBatchSize; i++ {
+		err = producer.SendMessage("single_partition", nil, StringEncoder(fmt.Sprintf("testing %d", i)))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	events := consumer.Events()
+	for i := 1; i <= TestBatchSize; i++ {
+		select {
+		case <-time.After(10 * time.Second):
+			t.Fatal("Not received any more events in the last 10 seconds.")
+
+		case event := <-events:
+			if string(event.Value) != fmt.Sprintf("testing %d", i) {
+				t.Fatal("Unexpected message with index %d: %s", i, event.Value)
+			}
+		}
+
+	}
+}