|
|
@@ -3,6 +3,7 @@ package sarama
|
|
|
import (
|
|
|
"fmt"
|
|
|
"os"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
"time"
|
|
|
@@ -96,6 +97,83 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
|
|
|
safeClose(t, producer)
|
|
|
}
|
|
|
|
|
|
+func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
|
|
|
+ setupFunctionalTest(t)
|
|
|
+ defer teardownFunctionalTest(t)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Producer.Flush.Frequency = 250 * time.Millisecond
|
|
|
+ config.Producer.Idempotent = true
|
|
|
+ config.Producer.Timeout = 500 * time.Millisecond
|
|
|
+ config.Producer.Retry.Max = 1
|
|
|
+ config.Producer.Retry.Backoff = 500 * time.Millisecond
|
|
|
+ config.Producer.Return.Successes = true
|
|
|
+ config.Producer.Return.Errors = true
|
|
|
+ config.Producer.RequiredAcks = WaitForAll
|
|
|
+ config.Net.MaxOpenRequests = 1
|
|
|
+ config.Version = V0_11_0_0
|
|
|
+
|
|
|
+ producer, err := NewSyncProducer(kafkaBrokers, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ defer safeClose(t, producer)
|
|
|
+
|
|
|
+ // Successfully publish a few messages
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ _, _, err = producer.SendMessage(&ProducerMessage{
|
|
|
+ Topic: "test.1",
|
|
|
+ Value: StringEncoder(fmt.Sprintf("%d message", i)),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // break the brokers.
|
|
|
+ for proxyName, proxy := range Proxies {
|
|
|
+ if !strings.Contains(proxyName, "kafka") {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if err := proxy.Disable(); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // This should fail hard now
|
|
|
+ for i := 10; i < 20; i++ {
|
|
|
+ _, _, err = producer.SendMessage(&ProducerMessage{
|
|
|
+ Topic: "test.1",
|
|
|
+ Value: StringEncoder(fmt.Sprintf("%d message", i)),
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now bring the proxy back up
|
|
|
+ for proxyName, proxy := range Proxies {
|
|
|
+ if !strings.Contains(proxyName, "kafka") {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if err := proxy.Enable(); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // We should be able to publish again (once everything calms down)
|
|
|
+ // (otherwise it times out)
|
|
|
+ for {
|
|
|
+ _, _, err = producer.SendMessage(&ProducerMessage{
|
|
|
+ Topic: "test.1",
|
|
|
+ Value: StringEncoder("comeback message"),
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func testProducingMessages(t *testing.T, config *Config) {
|
|
|
setupFunctionalTest(t)
|
|
|
defer teardownFunctionalTest(t)
|