Browse Source

Merge pull request #505 from dancannon/hostpool

Update host policies to support host pools, fixes #499
Chris Bannister 10 years ago
parent
commit
a64caf392c
7 changed files with 217 additions and 44 deletions
  1. 1 0
      AUTHORS
  2. 5 5
      cassandra_test.go
  3. 4 4
      connectionpool.go
  4. 3 1
      control.go
  5. 120 6
      policies.go
  6. 69 25
      policies_test.go
  7. 15 3
      session.go

+ 1 - 0
AUTHORS

@@ -56,3 +56,4 @@ John Weldon <johnweldon4@gmail.com>
 Adrien Bustany <adrien@bustany.org>
 Andrey Smirnov <smirnov.andrey@gmail.com>
 Adam Weiner <adamsweiner@gmail.com>
+Daniel Cannon <daniel@danielcannon.co.uk>

+ 5 - 5
cassandra_test.go

@@ -1137,7 +1137,7 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
 		t.Fatal("create:", err)
 	}
 	stmt := "INSERT INTO " + table + " (foo, bar) VALUES (?, 7)"
-	conn := session.pool.Pick(nil)
+	_, conn := session.pool.Pick(nil)
 	flight := new(inflightPrepare)
 	stmtsLRU.Lock()
 	stmtsLRU.lru.Add(conn.addr+stmt, flight)
@@ -1160,7 +1160,7 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
 
 func TestMissingSchemaPrepare(t *testing.T) {
 	s := createSession(t)
-	conn := s.pool.Pick(nil)
+	_, conn := s.pool.Pick(nil)
 	defer s.Close()
 
 	insertQry := &Query{stmt: "INSERT INTO invalidschemaprep (val) VALUES (?)", values: []interface{}{5}, cons: s.cons,
@@ -1209,7 +1209,7 @@ func TestQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	conn := session.pool.Pick(nil)
+	_, conn := session.pool.Pick(nil)
 	info, err := conn.prepareStatement("SELECT release_version, host_id FROM system.local WHERE key = ?", nil)
 
 	if err != nil {
@@ -2054,7 +2054,7 @@ func TestStream0(t *testing.T) {
 			break
 		}
 
-		conn = session.pool.Pick(nil)
+		_, conn = session.pool.Pick(nil)
 	}
 
 	if conn == nil {
@@ -2093,7 +2093,7 @@ func TestNegativeStream(t *testing.T) {
 			break
 		}
 
-		conn = session.pool.Pick(nil)
+		_, conn = session.pool.Pick(nil)
 	}
 
 	if conn == nil {

+ 4 - 4
connectionpool.go

@@ -171,11 +171,11 @@ func (p *policyConnPool) Size() int {
 	return count
 }
 
-func (p *policyConnPool) Pick(qry *Query) *Conn {
+func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
 	nextHost := p.hostPolicy.Pick(qry)
 
 	var (
-		host *HostInfo
+		host SelectedHost
 		conn *Conn
 	)
 
@@ -185,10 +185,10 @@ func (p *policyConnPool) Pick(qry *Query) *Conn {
 		if host == nil {
 			break
 		}
-		conn = p.hostConnPools[host.Peer].Pick(qry)
+		conn = p.hostConnPools[host.Info().Peer].Pick(qry)
 	}
 	p.mu.RUnlock()
-	return conn
+	return host, conn
 }
 
 func (p *policyConnPool) Close() {

+ 3 - 1
control.go

@@ -83,17 +83,19 @@ func (c *controlConn) reconnect(refreshring bool) {
 
 	// TODO: should have our own roundrobbin for hosts so that we can try each
 	// in succession and guantee that we get a different host each time.
-	conn := c.session.pool.Pick(nil)
+	host, conn := c.session.pool.Pick(nil)
 	if conn == nil {
 		return
 	}
 
 	newConn, err := Connect(conn.addr, conn.cfg, c)
 	if err != nil {
+		host.Mark(err)
 		// TODO: add log handler for things like this
 		return
 	}
 
+	host.Mark(nil)
 	c.conn.Store(newConn)
 	success = true
 

+ 120 - 6
policies.go

@@ -8,6 +8,8 @@ import (
 	"log"
 	"sync"
 	"sync/atomic"
+
+	"github.com/hailocab/go-hostpool"
 )
 
 // RetryableQuery is an interface that represents a query or batch statement that
@@ -57,8 +59,15 @@ type HostSelectionPolicy interface {
 	Pick(*Query) NextHost
 }
 
+// SelectedHost is an interface returned when picking a host from a host
+// selection policy.
+type SelectedHost interface {
+	Info() *HostInfo
+	Mark(error)
+}
+
 // NextHost is an iteration function over picked hosts
-type NextHost func() *HostInfo
+type NextHost func() SelectedHost
 
 // RoundRobinHostPolicy is a round-robin load balancing policy, where each host
 // is tried sequentially for each query.
@@ -86,7 +95,7 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
 	// i is used to limit the number of attempts to find a host
 	// to the number of hosts known to this policy
 	var i uint32 = 0
-	return func() *HostInfo {
+	return func() SelectedHost {
 		r.mu.RLock()
 		if len(r.hosts) == 0 {
 			r.mu.RUnlock()
@@ -102,10 +111,24 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
 			i++
 		}
 		r.mu.RUnlock()
-		return host
+		return selectedRoundRobinHost{host}
 	}
 }
 
+// selectedRoundRobinHost is a host returned by the roundRobinHostPolicy and
+// implements the SelectedHost interface
+type selectedRoundRobinHost struct {
+	info *HostInfo
+}
+
+func (host selectedRoundRobinHost) Info() *HostInfo {
+	return host.info
+}
+
+func (host selectedRoundRobinHost) Mark(err error) {
+	// noop
+}
+
 // TokenAwareHostPolicy is a token aware host selection policy, where hosts are
 // selected based on the partition key, so queries are sent to the host which
 // owns the partition. Fallback is used when routing information is not available.
@@ -195,10 +218,10 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
 		hostReturned bool
 		fallbackIter NextHost
 	)
-	return func() *HostInfo {
+	return func() SelectedHost {
 		if !hostReturned {
 			hostReturned = true
-			return host
+			return selectedTokenAwareHost{host}
 		}
 
 		// fallback
@@ -209,7 +232,7 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
 		fallbackHost := fallbackIter()
 
 		// filter the token aware selected hosts from the fallback hosts
-		if fallbackHost == host {
+		if fallbackHost.Info() == host {
 			fallbackHost = fallbackIter()
 		}
 
@@ -217,6 +240,97 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
 	}
 }
 
+// selectedTokenAwareHost is a host returned by the tokenAwareHostPolicy and
+// implements the SelectedHost interface
+type selectedTokenAwareHost struct {
+	info *HostInfo
+}
+
+func (host selectedTokenAwareHost) Info() *HostInfo {
+	return host.info
+}
+
+func (host selectedTokenAwareHost) Mark(err error) {
+	// noop
+}
+
+// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
+// to distribute queries between hosts and prevent sending queries to
+// unresponsive hosts. When creating the host pool that is passed to the policy
+// use an empty slice of hosts as the hostpool will be populated later by gocql.
+// See below for examples of usage:
+//
+//     // Create host selection policy using a simple host pool
+//     cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
+//
+//     // Create host selection policy using an epsilon greddy pool
+//     cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
+//         hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
+//     )
+//
+func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy {
+	return &hostPoolHostPolicy{hostMap: map[string]HostInfo{}, hp: hp}
+}
+
+type hostPoolHostPolicy struct {
+	hp      hostpool.HostPool
+	hostMap map[string]HostInfo
+	mu      sync.RWMutex
+}
+
+func (r *hostPoolHostPolicy) SetHosts(hosts []HostInfo) {
+	peers := make([]string, len(hosts))
+	hostMap := make(map[string]HostInfo, len(hosts))
+
+	for i, host := range hosts {
+		peers[i] = host.Peer
+		hostMap[host.Peer] = host
+	}
+
+	r.mu.Lock()
+	r.hp.SetHosts(peers)
+	r.hostMap = hostMap
+	r.mu.Unlock()
+}
+
+func (r *hostPoolHostPolicy) SetPartitioner(partitioner string) {
+	// noop
+}
+
+func (r *hostPoolHostPolicy) Pick(qry *Query) NextHost {
+	return func() SelectedHost {
+		r.mu.RLock()
+		if len(r.hostMap) == 0 {
+			r.mu.RUnlock()
+			return nil
+		}
+
+		hostR := r.hp.Get()
+		host, ok := r.hostMap[hostR.Host()]
+		if !ok {
+			r.mu.RUnlock()
+			return nil
+		}
+
+		return selectedHostPoolHost{&host, hostR}
+	}
+}
+
+// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
+// implements the SelectedHost interface
+type selectedHostPoolHost struct {
+	info  *HostInfo
+	hostR hostpool.HostPoolResponse
+}
+
+func (host selectedHostPoolHost) Info() *HostInfo {
+	return host.info
+}
+
+func (host selectedHostPoolHost) Mark(err error) {
+	host.hostR.Mark(err)
+}
+
 //ConnSelectionPolicy is an interface for selecting an
 //appropriate connection for executing a query
 type ConnSelectionPolicy interface {

+ 69 - 25
policies_test.go

@@ -4,7 +4,12 @@
 
 package gocql
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+
+	"github.com/hailocab/go-hostpool"
+)
 
 // Tests of the round-robin host selection policy implementation
 func TestRoundRobinHostPolicy(t *testing.T) {
@@ -20,26 +25,26 @@ func TestRoundRobinHostPolicy(t *testing.T) {
 	// the first host selected is actually at [1], but this is ok for RR
 	// interleaved iteration should always increment the host
 	iterA := policy.Pick(nil)
-	if actual := iterA(); actual != &hosts[1] {
-		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
+	if actual := iterA(); actual.Info() != &hosts[1] {
+		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId)
 	}
 	iterB := policy.Pick(nil)
-	if actual := iterB(); actual != &hosts[0] {
-		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
+	if actual := iterB(); actual.Info() != &hosts[0] {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId)
 	}
-	if actual := iterB(); actual != &hosts[1] {
-		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
+	if actual := iterB(); actual.Info() != &hosts[1] {
+		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId)
 	}
-	if actual := iterA(); actual != &hosts[0] {
-		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
+	if actual := iterA(); actual.Info() != &hosts[0] {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId)
 	}
 
 	iterC := policy.Pick(nil)
-	if actual := iterC(); actual != &hosts[1] {
-		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
+	if actual := iterC(); actual.Info() != &hosts[1] {
+		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId)
 	}
-	if actual := iterC(); actual != &hosts[0] {
-		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
+	if actual := iterC(); actual.Info() != &hosts[0] {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId)
 	}
 }
 
@@ -70,13 +75,13 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 
 	// the token ring is not setup without the partitioner, but the fallback
 	// should work
-	if actual := policy.Pick(nil)(); actual.Peer != "1" {
-		t.Errorf("Expected peer 1 but was %s", actual.Peer)
+	if actual := policy.Pick(nil)(); actual.Info().Peer != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().Peer)
 	}
 
 	query.RoutingKey([]byte("30"))
-	if actual := policy.Pick(query)(); actual.Peer != "2" {
-		t.Errorf("Expected peer 2 but was %s", actual.Peer)
+	if actual := policy.Pick(query)(); actual.Info().Peer != "2" {
+		t.Errorf("Expected peer 2 but was %s", actual.Info().Peer)
 	}
 
 	policy.SetPartitioner("OrderedPartitioner")
@@ -84,19 +89,58 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 	// now the token ring is configured
 	query.RoutingKey([]byte("20"))
 	iter = policy.Pick(query)
-	if actual := iter(); actual.Peer != "1" {
-		t.Errorf("Expected peer 1 but was %s", actual.Peer)
+	if actual := iter(); actual.Info().Peer != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().Peer)
 	}
 	// rest are round robin
-	if actual := iter(); actual.Peer != "3" {
-		t.Errorf("Expected peer 3 but was %s", actual.Peer)
+	if actual := iter(); actual.Info().Peer != "3" {
+		t.Errorf("Expected peer 3 but was %s", actual.Info().Peer)
+	}
+	if actual := iter(); actual.Info().Peer != "0" {
+		t.Errorf("Expected peer 0 but was %s", actual.Info().Peer)
+	}
+	if actual := iter(); actual.Info().Peer != "2" {
+		t.Errorf("Expected peer 2 but was %s", actual.Info().Peer)
 	}
-	if actual := iter(); actual.Peer != "0" {
-		t.Errorf("Expected peer 0 but was %s", actual.Peer)
+}
+
+// Tests of the host pool host selection policy implementation
+func TestHostPoolHostPolicy(t *testing.T) {
+	policy := HostPoolHostPolicy(hostpool.New(nil))
+
+	hosts := []HostInfo{
+		HostInfo{HostId: "0", Peer: "0"},
+		HostInfo{HostId: "1", Peer: "1"},
 	}
-	if actual := iter(); actual.Peer != "2" {
-		t.Errorf("Expected peer 2 but was %s", actual.Peer)
+
+	policy.SetHosts(hosts)
+
+	// the first host selected is actually at [1], but this is ok for RR
+	// interleaved iteration should always increment the host
+	iter := policy.Pick(nil)
+	actualA := iter()
+	if actualA.Info().HostId != "0" {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostId)
+	}
+	actualA.Mark(nil)
+
+	actualB := iter()
+	if actualB.Info().HostId != "1" {
+		t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostId)
+	}
+	actualB.Mark(fmt.Errorf("error"))
+
+	actualC := iter()
+	if actualC.Info().HostId != "0" {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostId)
+	}
+	actualC.Mark(nil)
+
+	actualD := iter()
+	if actualD.Info().HostId != "0" {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostId)
 	}
+	actualD.Mark(nil)
 }
 
 // Tests of the round-robin connection selection policy implementation

+ 15 - 3
session.go

@@ -235,7 +235,7 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 	qry.attempts = 0
 	qry.totalLatency = 0
 	for {
-		conn := s.pool.Pick(qry)
+		host, conn := s.pool.Pick(qry)
 
 		//Assign the error unavailable to the iterator
 		if conn == nil {
@@ -254,9 +254,13 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 
 		//Exit for loop if the query was successful
 		if iter.err == nil {
+			host.Mark(iter.err)
 			break
 		}
 
+		// Mark host as ok
+		host.Mark(nil)
+
 		if qry.rt == nil || !qry.rt.Attempt(qry) {
 			break
 		}
@@ -323,7 +327,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	)
 
 	// get the query info for the statement
-	conn := s.pool.Pick(nil)
+	host, conn := s.pool.Pick(nil)
 	if conn == nil {
 		// no connections
 		inflight.err = ErrNoConnections
@@ -336,9 +340,13 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	if inflight.err != nil {
 		// don't cache this error
 		s.routingKeyInfoCache.Remove(stmt)
+		host.Mark(inflight.err)
 		return nil, inflight.err
 	}
 
+	// Mark host as OK
+	host.Mark(nil)
+
 	if len(info.Args) == 0 {
 		// no arguments, no routing key, and no error
 		return nil, nil
@@ -418,7 +426,7 @@ func (s *Session) executeBatch(batch *Batch) (*Iter, error) {
 	batch.attempts = 0
 	batch.totalLatency = 0
 	for {
-		conn := s.pool.Pick(nil)
+		host, conn := s.pool.Pick(nil)
 
 		//Assign the error unavailable and break loop
 		if conn == nil {
@@ -431,9 +439,13 @@ func (s *Session) executeBatch(batch *Batch) (*Iter, error) {
 		batch.attempts++
 		//Exit loop if operation executed correctly
 		if err == nil {
+			host.Mark(err)
 			return iter, err
 		}
 
+		// Mark host as OK
+		host.Mark(nil)
+
 		if batch.rt == nil || !batch.rt.Attempt(batch) {
 			break
 		}