Burke Libbey 12 tahun lalu
induk
melakukan
f98386096e
2 mengubah file dengan 79 tambahan dan 7 penghapusan
  1. 13 4
      producer.go
  2. 66 3
      producer_test.go

+ 13 - 4
producer.go

@@ -1,7 +1,6 @@
 package sarama
 
 import (
-	"fmt"
 	"sync"
 	"time"
 )
@@ -176,7 +175,7 @@ func (p *Producer) QueueMessage(topic string, key, value Encoder) (err error) {
 //
 // If you care about message ordering, you should not call QueueMessage and
 // SendMessage on the same Producer.
-func (p *Producer) SendMessage(topic string, key, value Encoder) error {
+func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
 	var keyBytes, valBytes []byte
 
 	if key != nil {
@@ -202,6 +201,18 @@ func (p *Producer) SendMessage(topic string, key, value Encoder) error {
 		failures: 0,
 	}
 
+	bp, err := p.brokerProducerFor(msg.tp)
+	if err != nil {
+		return err
+	}
+
+	// TODO: don't pass through QueueMessage pipeline if failed.
+	var prb produceRequestBuilder = []*produceMessage{msg}
+	errs := make(chan error, 1)
+	bp.flushRequest(p, prb, func(err error) {
+		errs <- err
+	})
+	return <-errs
 }
 
 func (p *Producer) addMessage(msg *produceMessage) error {
@@ -253,7 +264,6 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 		timer := time.NewTimer(maxBufferTime)
 		wg.Done()
 		for {
-			println("SEL")
 			select {
 			case <-bp.flushNow:
 				bp.flush(p)
@@ -316,7 +326,6 @@ func (bp *brokerProducer) flushIfAnyMessages(p *Producer) {
 
 func (bp *brokerProducer) flush(p *Producer) {
 	var prb produceRequestBuilder
-	fmt.Println("FLUSHING")
 
 	// only deliver messages for topic-partitions that are not currently being delivered.
 	bp.mapM.Lock()

+ 66 - 3
producer_test.go

@@ -64,8 +64,65 @@ func TestSimpleProducer(t *testing.T) {
 	}
 }
 
+func TestSimpleSyncProducer(t *testing.T) {
+	responses := make(chan []byte, 1)
+	extraResponses := make(chan []byte)
+	mockBroker := NewMockBroker(t, responses)
+	mockExtra := NewMockBroker(t, extraResponses)
+	defer mockBroker.Close()
+	defer mockExtra.Close()
+
+	// return the extra mock as another available broker
+	response := []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00,
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00}
+	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
+	responses <- response
+	go func() {
+		msg := []byte{
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x00, 0x00, 0x00,
+			0x00, 0x00,
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+		binary.BigEndian.PutUint64(msg[23:], 0)
+		for i := 0; i < 10; i++ {
+			extraResponses <- msg
+		}
+	}()
+
+	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 10th message.
+		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
+	})
+	defer producer.Close()
+
+	for i := 0; i < 10; i++ {
+		sendSyncMessage(t, producer, "my_topic", "ABC THE MESSAGE")
+	}
+}
+
 func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
-	err := producer.SendMessage(topic, nil, StringEncoder(key))
+	err := producer.QueueMessage(topic, nil, StringEncoder(key))
 	if err != nil {
 		t.Error(err)
 	}
@@ -75,7 +132,14 @@ func sendMessage(t *testing.T, producer *Producer, topic string, key string, exp
 	assertNoMessages(t, producer.Errors())
 }
 
-/*
+func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
+	err := producer.SendMessage(topic, nil, StringEncoder(key))
+	if err != nil {
+		t.Error(err)
+	}
+	assertNoMessages(t, producer.Errors())
+}
+
 func TestMultipleFlushes(t *testing.T) {
 	responses := make(chan []byte, 1)
 	extraResponses := make(chan []byte)
@@ -262,7 +326,6 @@ func TestMultipleProducer(t *testing.T) {
 	}
 }
 
-*/
 func readMessage(t *testing.T, ch chan error) {
 	select {
 	case err := <-ch: