Преглед изворни кода

Add HostPoolHostPolicy policy

Daniel Cannon пре 10 година
родитељ
комит
8585d308f7
2 измењених фајлова са 113 додато и 1 уклоњено
  1. 68 0
      policies.go
  2. 45 1
      policies_test.go

+ 68 - 0
policies.go

@@ -8,6 +8,8 @@ import (
 	"log"
 	"sync"
 	"sync/atomic"
+
+	"github.com/hailocab/go-hostpool"
 )
 
 // RetryableQuery is an interface that represents a query or batch statement that
@@ -252,6 +254,72 @@ func (host selectedTokenAwareHost) Mark(err error) {
 	// noop
 }
 
+// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
+// to distribute queries between hosts and prevent sending queries to
+// unresponsive hosts.
+func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy {
+	return &hostPoolHostPolicy{hostMap: map[string]HostInfo{}, hp: hp}
+}
+
+type hostPoolHostPolicy struct {
+	hp      hostpool.HostPool
+	hostMap map[string]HostInfo
+	mu      sync.RWMutex
+}
+
+func (r *hostPoolHostPolicy) SetHosts(hosts []HostInfo) {
+	peers := make([]string, len(hosts))
+	hostMap := make(map[string]HostInfo, len(hosts))
+
+	for i, host := range hosts {
+		peers[i] = host.Peer
+		hostMap[host.Peer] = host
+	}
+
+	r.mu.Lock()
+	r.hp.SetHosts(peers)
+	r.hostMap = hostMap
+	r.mu.Unlock()
+}
+
+func (r *hostPoolHostPolicy) SetPartitioner(partitioner string) {
+	// noop
+}
+
+func (r *hostPoolHostPolicy) Pick(qry *Query) NextHost {
+	return func() SelectedHost {
+		r.mu.RLock()
+		if len(r.hostMap) == 0 {
+			r.mu.RUnlock()
+			return nil
+		}
+
+		hostR := r.hp.Get()
+		host, ok := r.hostMap[hostR.Host()]
+		if !ok {
+			r.mu.RUnlock()
+			return nil
+		}
+
+		return selectedHostPoolHost{&host, hostR}
+	}
+}
+
+// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
+// implements the SelectedHost interface
+type selectedHostPoolHost struct {
+	info  *HostInfo
+	hostR hostpool.HostPoolResponse
+}
+
+func (host selectedHostPoolHost) Info() *HostInfo {
+	return host.info
+}
+
+func (host selectedHostPoolHost) Mark(err error) {
+	host.hostR.Mark(err)
+}
+
 //ConnSelectionPolicy is an interface for selecting an
 //appropriate connection for executing a query
 type ConnSelectionPolicy interface {

+ 45 - 1
policies_test.go

@@ -4,7 +4,12 @@
 
 package gocql
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+
+	"github.com/hailocab/go-hostpool"
+)
 
 // Tests of the round-robin host selection policy implementation
 func TestRoundRobinHostPolicy(t *testing.T) {
@@ -99,6 +104,45 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 	}
 }
 
+// Tests of the host pool host selection policy implementation
+func TestHostPoolHostPolicy(t *testing.T) {
+	policy := HostPoolHostPolicy(hostpool.New([]string{}))
+
+	hosts := []HostInfo{
+		HostInfo{HostId: "0", Peer: "0"},
+		HostInfo{HostId: "1", Peer: "1"},
+	}
+
+	policy.SetHosts(hosts)
+
+	// the first host selected is actually at [1], but this is ok for RR
+	// interleaved iteration should always increment the host
+	iter := policy.Pick(nil)
+	actualA := iter()
+	if actualA.Info().HostId != "0" {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostId)
+	}
+	actualA.Mark(nil)
+
+	actualB := iter()
+	if actualB.Info().HostId != "1" {
+		t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostId)
+	}
+	actualB.Mark(fmt.Errorf("error"))
+
+	actualC := iter()
+	if actualC.Info().HostId != "0" {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostId)
+	}
+	actualC.Mark(nil)
+
+	actualD := iter()
+	if actualD.Info().HostId != "0" {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostId)
+	}
+	actualD.Mark(nil)
+}
+
 // Tests of the round-robin connection selection policy implementation
 func TestRoundRobinConnPolicy(t *testing.T) {
 	policy := RoundRobinConnPolicy()()