|
|
@@ -7,16 +7,17 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-func TestSimpleConsumer(t *testing.T) {
|
|
|
- masterResponses := make(chan []byte, 1)
|
|
|
- extraResponses := make(chan []byte)
|
|
|
- mockBroker := NewMockBroker(t, masterResponses)
|
|
|
- mockExtra := NewMockBroker(t, extraResponses)
|
|
|
- defer mockBroker.Close()
|
|
|
- defer mockExtra.Close()
|
|
|
-
|
|
|
- // return the extra mock as another available broker
|
|
|
- response := []byte{
|
|
|
+var (
|
|
|
+ consumerStopper = []byte{
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ }
|
|
|
+ extraBrokerMetadata = []byte{
|
|
|
0x00, 0x00, 0x00, 0x01,
|
|
|
0x00, 0x00, 0x00, 0x01,
|
|
|
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
|
|
|
@@ -29,7 +30,21 @@ func TestSimpleConsumer(t *testing.T) {
|
|
|
0x00, 0x00, 0x00, 0x00,
|
|
|
0x00, 0x00, 0x00, 0x01,
|
|
|
0x00, 0x00, 0x00, 0x00,
|
|
|
- 0x00, 0x00, 0x00, 0x00}
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ }
|
|
|
+)
|
|
|
+
|
|
|
+func TestSimpleConsumer(t *testing.T) {
|
|
|
+ masterResponses := make(chan []byte, 1)
|
|
|
+ extraResponses := make(chan []byte)
|
|
|
+ mockBroker := NewMockBroker(t, masterResponses)
|
|
|
+ mockExtra := NewMockBroker(t, extraResponses)
|
|
|
+ defer mockBroker.Close()
|
|
|
+ defer mockExtra.Close()
|
|
|
+
|
|
|
+ // return the extra mock as another available broker
|
|
|
+ response := make([]byte, len(extraBrokerMetadata))
|
|
|
+ copy(response, extraBrokerMetadata)
|
|
|
binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
|
|
|
masterResponses <- response
|
|
|
go func() {
|
|
|
@@ -54,14 +69,7 @@ func TestSimpleConsumer(t *testing.T) {
|
|
|
binary.BigEndian.PutUint64(msg[36:], uint64(i))
|
|
|
extraResponses <- msg
|
|
|
}
|
|
|
- extraResponses <- []byte{
|
|
|
- 0x00, 0x00, 0x00, 0x01,
|
|
|
- 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
|
|
|
- 0x00, 0x00, 0x00, 0x01,
|
|
|
- 0x00, 0x00, 0x00, 0x00,
|
|
|
- 0x00, 0x00,
|
|
|
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
|
|
- 0x00, 0x00, 0x00, 0x00}
|
|
|
+ extraResponses <- consumerStopper
|
|
|
}()
|
|
|
|
|
|
client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
|
|
|
@@ -87,6 +95,79 @@ func TestSimpleConsumer(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestConsumerRawOffset(t *testing.T) {
|
|
|
+ masterResponses := make(chan []byte, 1)
|
|
|
+ extraResponses := make(chan []byte, 1)
|
|
|
+ mockBroker := NewMockBroker(t, masterResponses)
|
|
|
+ mockExtra := NewMockBroker(t, extraResponses)
|
|
|
+ defer mockBroker.Close()
|
|
|
+ defer mockExtra.Close()
|
|
|
+
|
|
|
+ // return the extra mock as another available broker
|
|
|
+ response := make([]byte, len(extraBrokerMetadata))
|
|
|
+ copy(response, extraBrokerMetadata)
|
|
|
+ binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
|
|
|
+ masterResponses <- response
|
|
|
+ extraResponses <- consumerStopper
|
|
|
+
|
|
|
+ client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ defer client.Close()
|
|
|
+
|
|
|
+ consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: RAW_OFFSET, OffsetValue: 1234})
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ defer consumer.Close()
|
|
|
+
|
|
|
+ if consumer.offset != 1234 {
|
|
|
+ t.Error("Raw offset not set correctly")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func TestConsumerLatestOffset(t *testing.T) {
|
|
|
+ masterResponses := make(chan []byte, 1)
|
|
|
+ extraResponses := make(chan []byte, 2)
|
|
|
+ mockBroker := NewMockBroker(t, masterResponses)
|
|
|
+ mockExtra := NewMockBroker(t, extraResponses)
|
|
|
+ defer mockBroker.Close()
|
|
|
+ defer mockExtra.Close()
|
|
|
+
|
|
|
+ // return the extra mock as another available broker
|
|
|
+ response := make([]byte, len(extraBrokerMetadata))
|
|
|
+ copy(response, extraBrokerMetadata)
|
|
|
+ binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
|
|
|
+ masterResponses <- response
|
|
|
+ extraResponses <- []byte{
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01,
|
|
|
+ }
|
|
|
+ extraResponses <- consumerStopper
|
|
|
+
|
|
|
+ client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ defer client.Close()
|
|
|
+
|
|
|
+ consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: LATEST_OFFSET})
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ defer consumer.Close()
|
|
|
+
|
|
|
+ if consumer.offset != 0x010101 {
|
|
|
+ t.Error("Latest offset not fetched correctly")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func ExampleConsumer() {
|
|
|
client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
|
|
|
if err != nil {
|