|
|
@@ -47,7 +47,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
|
|
|
config.Producer.Flush.Frequency = 50 * time.Millisecond
|
|
|
config.Producer.Flush.Messages = 200
|
|
|
config.Producer.Return.Successes = true
|
|
|
- producer, err := NewAsyncProducer(kafkaBrokers, config)
|
|
|
+ producer, err := NewSyncProducer(kafkaBrokers, config)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
@@ -56,17 +56,13 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
|
|
|
wg.Add(TestBatchSize)
|
|
|
|
|
|
for i := 1; i <= TestBatchSize; i++ {
|
|
|
-
|
|
|
- go func(i int, w *sync.WaitGroup) {
|
|
|
- defer w.Done()
|
|
|
+ go func(i int) {
|
|
|
+ defer wg.Done()
|
|
|
msg := &ProducerMessage{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
|
|
|
- producer.Input() <- msg
|
|
|
- select {
|
|
|
- case ret := <-producer.Errors():
|
|
|
- t.Fatal(ret.Err)
|
|
|
- case <-producer.Successes():
|
|
|
+ if _, _, err := producer.SendMessage(msg); err != nil {
|
|
|
+ t.Error(i, err)
|
|
|
}
|
|
|
- }(i, &wg)
|
|
|
+ }(i)
|
|
|
}
|
|
|
|
|
|
wg.Wait()
|