فهرست منبع

Merge pull request #160 from Shopify/constant-partitioner

Implement ConstantPartitioner
Evan Huus 11 سال پیش
والد
کامیت
9027fcbe69
3فایلهای تغییر یافته به همراه23 افزوده شده و 1 حذف شده
  1. 3 1
      functional_test.go
  2. 9 0
      partitioner.go
  3. 11 0
      partitioner_test.go

+ 3 - 1
functional_test.go

@@ -60,7 +60,9 @@ func TestProducingMessages(t *testing.T) {
 	}
 	defer consumer.Close()
 
-	producer, err := NewProducer(client, nil)
+	producerConfig := NewProducerConfig()
+	producerConfig.Partitioner = &ConstantPartitioner{Constant: 0}
+	producer, err := NewProducer(client, producerConfig)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 9 - 0
partitioner.go

@@ -84,3 +84,12 @@ func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
 	}
 	return hash % numPartitions
 }
+
+// ConstantPartitioner implements the Partitioner interface by just returning a constant value.
+type ConstantPartitioner struct {
+	Constant int32
+}
+
+func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+	return p.Constant
+}

+ 11 - 0
partitioner_test.go

@@ -72,3 +72,14 @@ func TestHashPartitioner(t *testing.T) {
 		assertPartitioningConsistent(t, partitioner, ByteEncoder(buf), 50)
 	}
 }
+
+func TestConstantPartitioner(t *testing.T) {
+	partitioner := &ConstantPartitioner{Constant: 0}
+
+	for i := 1; i < 50; i++ {
+		choice := partitioner.Partition(nil, 50)
+		if choice != 0 {
+			t.Error("Returned partition", choice, "instead of 0.")
+		}
+	}
+}