Преглед изворни кода

First round of code review fixes

- Changed roundRobinHostPolicy to increment the policy's round robin position
for each host iteration instead of iterating locally to the iterator
- Added documentation to NewPolicyConnPool
- Added "dial" operation errors to those not logged when hostConnPool is
connecting to a node
- Only rebuild the token ring of a token aware host selection policy when
the partitioner class changes
- Simplify the code in the token aware selection policy host iterator
- No need to create a new *Conn slice versus using nil in a new round robin
connection policy
- the tokenRing functions now handle the case of the tokenRing pointer
being nil internally
- removed connectionpool_systems_test files until their tests can be successfully integrated into the integration.sh and cassandra_test.go
- Introduced SetHosts interface to hold the common definition of the SetHosts function for ConnectionPool and HostSelectionPolicy. ConnectionPool updated to embed SetHosts and HostSelectionPolicy updated to embed SetHosts and SetPartitioner
- Changed the locking in hostConnPool to use a read-write lock which should allow for concurrent calls to Pick for that same struct
- Added random variance to the sleep time after attempting to fill a hostConnPool which should hopefully avoid any synchronization between multiple gocql clients on a node which has become unreachable
Justin Corpron пре 10 година
родитељ
комит
b6bbb37b4c
6 измењених фајлова са 99 додато и 370 уклоњено
  1. 51 21
      connectionpool.go
  2. 0 266
      connectionpool_systems_test.go
  3. 0 54
      connectionpool_systems_test.sh
  4. 26 17
      policies.go
  5. 13 11
      policies_test.go
  6. 9 1
      token.go

+ 51 - 21
connectionpool.go

@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"io/ioutil"
 	"log"
+	"math/rand"
 	"net"
 	"sync"
 	"time"
@@ -89,9 +90,14 @@ this type as the connection pool to use you would assign it to the ClusterConfig
 To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
 */
 type ConnectionPool interface {
+	SetHosts
 	Pick(*Query) *Conn
 	Size() int
 	Close()
+}
+
+// interface to implement to receive the host information
+type SetHosts interface {
 	SetHosts(hosts []HostInfo)
 }
 
@@ -473,6 +479,11 @@ type policyConnPool struct {
 	hostConnPools map[string]*hostConnPool
 }
 
+//Creates a policy based connection pool. This func isn't meant to be directly
+//used as a NewPoolFunc in ClusterConfig, instead a func should be created
+//which satisfies the NewPoolFunc type, which calls this func with the desired
+//hostPolicy and connPolicy; see NewRoundRobinConnPool or NewTokenAwareConnPool
+//for examples.
 func NewPolicyConnPool(
 	cfg *ClusterConfig,
 	hostPolicy HostSelectionPolicy,
@@ -617,7 +628,7 @@ type hostConnPool struct {
 	keyspace string
 	policy   ConnSelectionPolicy
 	// protection for conns, closed, filling
-	mu      sync.Mutex
+	mu      sync.RWMutex
 	conns   []*Conn
 	closed  bool
 	filling bool
@@ -653,14 +664,14 @@ func newHostConnPool(
 
 // Pick a connection from this connection pool for the given query.
 func (pool *hostConnPool) Pick(qry *Query) *Conn {
-	pool.mu.Lock()
+	pool.mu.RLock()
 	if pool.closed {
-		pool.mu.Unlock()
+		pool.mu.RUnlock()
 		return nil
 	}
 
 	empty := len(pool.conns) == 0
-	pool.mu.Unlock()
+	pool.mu.RUnlock()
 
 	if empty {
 		// try to fill the empty pool
@@ -672,8 +683,8 @@ func (pool *hostConnPool) Pick(qry *Query) *Conn {
 
 //Size returns the number of connections currently active in the pool
 func (pool *hostConnPool) Size() int {
-	pool.mu.Lock()
-	defer pool.mu.Unlock()
+	pool.mu.RLock()
+	defer pool.mu.RUnlock()
 
 	return len(pool.conns)
 }
@@ -694,10 +705,10 @@ func (pool *hostConnPool) Close() {
 
 // Fill the connection pool
 func (pool *hostConnPool) fill() {
-	pool.mu.Lock()
+	pool.mu.RLock()
 	// avoid filling a closed pool, or concurrent filling
 	if pool.closed || pool.filling {
-		pool.mu.Unlock()
+		pool.mu.RUnlock()
 		return
 	}
 
@@ -707,6 +718,20 @@ func (pool *hostConnPool) fill() {
 
 	// avoid filling a full (or overfull) pool
 	if fillCount <= 0 {
+		pool.mu.RUnlock()
+		return
+	}
+
+	// switch from read to write lock
+	pool.mu.RUnlock()
+	pool.mu.Lock()
+
+	// double check everything since the lock was released
+	startCount = len(pool.conns)
+	fillCount = pool.size - startCount
+	if pool.closed || pool.filling || fillCount <= 0 {
+		// looks like another goroutine already beat this
+		// goroutine to the filling
 		pool.mu.Unlock()
 		return
 	}
@@ -716,16 +741,14 @@ func (pool *hostConnPool) fill() {
 
 	// allow others to access the pool while filling
 	pool.mu.Unlock()
+	// only this goroutine should make calls to fill/empty the pool at this
+	// point until after this routine or its subordinates calls
+	// fillingStopped
 
 	// fill only the first connection synchronously
 	if startCount == 0 {
 		err := pool.connect()
-		if opErr, ok := err.(*net.OpError); ok && opErr.Op == "read" {
-			// connection refused
-			// these are typical during a node outage so avoid log spam.
-		} else if err != nil {
-			log.Printf("error: failed to connect to %s - %v", pool.addr, err)
-		}
+		pool.logConnectErr(err)
 
 		if err != nil {
 			// probably unreachable host
@@ -741,12 +764,7 @@ func (pool *hostConnPool) fill() {
 	go func() {
 		for fillCount > 0 {
 			err := pool.connect()
-			if opErr, ok := err.(*net.OpError); ok && opErr.Op == "read" {
-				// connection refused
-				// these are typical during a node outage so avoid log spam.
-			} else if err != nil {
-				log.Printf("error: failed to connect to %s - %v", pool.addr, err)
-			}
+			pool.logConnectErr(err)
 
 			// decrement, even on error
 			fillCount--
@@ -757,10 +775,22 @@ func (pool *hostConnPool) fill() {
 	}()
 }
 
+func (pool *hostConnPool) logConnectErr(err error) {
+	if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "red") {
+		// connection refused
+		// these are typical during a node outage so avoid log spam.
+	} else if err != nil {
+		// unexpected error
+		log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
+	}
+}
+
 // transition back to a not-filling state.
 func (pool *hostConnPool) fillingStopped() {
 	// wait for some time to avoid back-to-back filling
-	time.Sleep(100 * time.Millisecond)
+	// this provides some time between failed attempts
+	// to fill the pool for the host to recover
+	time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
 
 	pool.mu.Lock()
 	pool.filling = false

+ 0 - 266
connectionpool_systems_test.go

@@ -1,266 +0,0 @@
-// Copyright (c) 2015 The gocql Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-// +build conn_pool
-
-package gocql
-
-import (
-	"flag"
-	"fmt"
-	"log"
-	"os/exec"
-	"strconv"
-	"strings"
-	"sync"
-	"testing"
-	"time"
-)
-
-// connection pool behavior test when nodes are removed from the cluster
-// to run this test, see connectionpool_systems_test.sh
-
-var (
-	flagCluster  = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
-	flagProto    = flag.Int("proto", 2, "protcol version")
-	flagCQL      = flag.String("cql", "3.0.0", "CQL version")
-	flagRF       = flag.Int("rf", 1, "replication factor for test keyspace")
-	clusterSize  = flag.Int("clusterSize", 1, "the expected size of the cluster")
-	nodesShut    = flag.Int("nodesShut", 1, "the number of nodes to shutdown during the test")
-	flagRetry    = flag.Int("retries", 5, "number of times to retry queries")
-	flagRunSsl   = flag.Bool("runssl", false, "Set to true to run ssl test")
-	clusterHosts []string
-)
-var initOnce sync.Once
-
-func init() {
-	flag.Parse()
-	clusterHosts = strings.Split(*flagCluster, ",")
-	log.SetFlags(log.Lshortfile | log.LstdFlags)
-}
-
-func createTable(s *Session, table string) error {
-	err := s.Query(table).Consistency(All).Exec()
-	if *clusterSize > 1 {
-		// wait for table definition to propogate
-		time.Sleep(250 * time.Millisecond)
-	}
-	return err
-}
-
-func createCluster() *ClusterConfig {
-	cluster := NewCluster(clusterHosts...)
-	cluster.ProtoVersion = *flagProto
-	cluster.CQLVersion = *flagCQL
-	cluster.Timeout = 5 * time.Second
-	cluster.Consistency = Quorum
-	if *flagRetry > 0 {
-		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
-	}
-	if *flagRunSsl {
-		cluster.SslOpts = &SslOptions{
-			CertPath:               "testdata/pki/gocql.crt",
-			KeyPath:                "testdata/pki/gocql.key",
-			CaPath:                 "testdata/pki/ca.crt",
-			EnableHostVerification: false,
-		}
-	}
-	return cluster
-}
-
-func createKeyspace(t testing.T, cluster *ClusterConfig, keyspace string) {
-	session, err := cluster.CreateSession()
-	if err != nil {
-		t.Fatal("createSession:", err)
-	}
-	if err = session.Query(`DROP KEYSPACE ` + keyspace).Exec(); err != nil {
-		t.Log("drop keyspace:", err)
-	}
-	err = session.Query(
-		fmt.Sprintf(
-			`
-			CREATE KEYSPACE %s
-			WITH replication = {
-				'class' : 'SimpleStrategy',
-				'replication_factor' : %d
-			}
-			`,
-			keyspace,
-			*flagRF,
-		),
-	).Consistency(All).Exec()
-	if err != nil {
-		t.Fatalf("error creating keyspace %s: %v", keyspace, err)
-	}
-	t.Logf("Created keyspace %s", keyspace)
-	session.Close()
-}
-
-func createSession(t testing.T) *Session {
-	cluster := createCluster()
-
-	// Drop and re-create the keyspace once. Different tests should use their own
-	// individual tables, but can assume that the table does not exist before.
-	initOnce.Do(func() {
-		createKeyspace(t, cluster, "gocql_test")
-	})
-
-	cluster.Keyspace = "gocql_test"
-	session, err := cluster.CreateSession()
-	if err != nil {
-		t.Fatal("createSession:", err)
-	}
-
-	return session
-}
-
-func TestSimplePool(t *testing.T) {
-	testConnPool(t, NewSimplePool)
-}
-
-func TestRRPolicyConnPool(t *testing.T) {
-	testConnPool(t, NewRoundRobinConnPool)
-}
-
-func TestTAPolicyConnPool(t *testing.T) {
-	testConnPool(t, NewTokenAwareConnPool)
-}
-
-func testConnPool(t *testing.T, connPoolType NewPoolFunc) {
-	var out []byte
-	var err error
-	log.SetFlags(log.Ltime)
-
-	// make sure the cluster is running
-	out, err = exec.Command("ccm", "start").CombinedOutput()
-	if err != nil {
-		t.Fatalf("Error running ccm command: %v", err)
-		fmt.Printf("ccm output:\n%s", string(out))
-	}
-
-	time.Sleep(time.Duration(*clusterSize) * 1000 * time.Millisecond)
-
-	// fire up a session (no discovery)
-	cluster := createCluster()
-	cluster.ConnPoolType = connPoolType
-	cluster.DiscoverHosts = false
-	session, err := cluster.CreateSession()
-	if err != nil {
-		t.Fatalf("Error connecting to cluster: %v", err)
-	}
-	defer session.Close()
-
-	time.Sleep(time.Duration(*clusterSize) * 1000 * time.Millisecond)
-
-	if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
-		t.Logf(
-			"WARN: Expected %d pool size, but was %d",
-			(*clusterSize)*cluster.NumConns,
-			session.Pool.Size(),
-		)
-	}
-
-	// start some connection monitoring
-	nilCheckStop := false
-	nilCount := 0
-	nilCheck := func() {
-		// assert that all connections returned by the pool are non-nil
-		for !nilCheckStop {
-			actual := session.Pool.Pick(nil)
-			if actual == nil {
-				nilCount++
-			}
-		}
-	}
-	go nilCheck()
-
-	// shutdown some hosts
-	log.Println("shutdown some hosts")
-	for i := 0; i < *nodesShut; i++ {
-		out, err = exec.Command("ccm", "node"+strconv.Itoa(i+1), "stop").CombinedOutput()
-		if err != nil {
-			t.Fatalf("Error running ccm command: %v", err)
-			fmt.Printf("ccm output:\n%s", string(out))
-		}
-		time.Sleep(1500 * time.Millisecond)
-	}
-	time.Sleep(500 * time.Millisecond)
-
-	if session.Pool.Size() != ((*clusterSize)-(*nodesShut))*cluster.NumConns {
-		t.Logf(
-			"WARN: Expected %d pool size, but was %d",
-			((*clusterSize)-(*nodesShut))*cluster.NumConns,
-			session.Pool.Size(),
-		)
-	}
-
-	// bringup the shutdown hosts
-	log.Println("bringup the shutdown hosts")
-	for i := 0; i < *nodesShut; i++ {
-		out, err = exec.Command("ccm", "node"+strconv.Itoa(i+1), "start").CombinedOutput()
-		if err != nil {
-			t.Fatalf("Error running ccm command: %v", err)
-			fmt.Printf("ccm output:\n%s", string(out))
-		}
-		time.Sleep(1500 * time.Millisecond)
-	}
-	time.Sleep(500 * time.Millisecond)
-
-	if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
-		t.Logf(
-			"WARN: Expected %d pool size, but was %d",
-			(*clusterSize)*cluster.NumConns,
-			session.Pool.Size(),
-		)
-	}
-
-	// assert that all connections returned by the pool are non-nil
-	if nilCount > 0 {
-		t.Errorf("%d nil connections returned from %T", nilCount, session.Pool)
-	}
-
-	// shutdown cluster
-	log.Println("shutdown cluster")
-	out, err = exec.Command("ccm", "stop").CombinedOutput()
-	if err != nil {
-		t.Fatalf("Error running ccm command: %v", err)
-		fmt.Printf("ccm output:\n%s", string(out))
-	}
-	time.Sleep(2500 * time.Millisecond)
-
-	if session.Pool.Size() != 0 {
-		t.Logf(
-			"WARN: Expected %d pool size, but was %d",
-			0,
-			session.Pool.Size(),
-		)
-	}
-
-	// start cluster
-	log.Println("start cluster")
-	out, err = exec.Command("ccm", "start").CombinedOutput()
-	if err != nil {
-		t.Fatalf("Error running ccm command: %v", err)
-		fmt.Printf("ccm output:\n%s", string(out))
-	}
-	time.Sleep(500 * time.Millisecond)
-
-	// reset the count
-	nilCount = 0
-
-	time.Sleep(3000 * time.Millisecond)
-
-	if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
-		t.Logf(
-			"WARN: Expected %d pool size, but was %d",
-			(*clusterSize)*cluster.NumConns,
-			session.Pool.Size(),
-		)
-	}
-
-	// assert that all connections returned by the pool are non-nil
-	if nilCount > 0 {
-		t.Errorf("%d nil connections returned from %T", nilCount, session.Pool)
-	}
-	nilCheckStop = true
-}

+ 0 - 54
connectionpool_systems_test.sh

@@ -1,54 +0,0 @@
-#!/bin/bash
-
-set -e
-
-function run_tests() {
-	local clusterSize=5
-	local nodesShut=2
-	local version=$1
-
-	ccm remove test || true
-
-	local keypath="$(pwd)/testdata/pki"
-
-	local conf=(
-	    "client_encryption_options.enabled: true"
-	    "client_encryption_options.keystore: $keypath/.keystore"
-	    "client_encryption_options.keystore_password: cassandra"
-	    "client_encryption_options.require_client_auth: true"
-	    "client_encryption_options.truststore: $keypath/.truststore"
-	    "client_encryption_options.truststore_password: cassandra"
-	    "concurrent_reads: 2"
-	    "concurrent_writes: 2"
-	    "rpc_server_type: sync"
-	    "rpc_min_threads: 2"
-	    "rpc_max_threads: 2"
-	    "write_request_timeout_in_ms: 5000"
-	    "read_request_timeout_in_ms: 5000"
-	)
-
-	ccm create test -v binary:$version -n $clusterSize -d --vnodes --jvm_arg="-Xmx256m"
-    ccm updateconf "${conf[@]}"
-	ccm start -v
-	ccm status
-	ccm node1 nodetool status
-
-	local proto=2
-	if [[ $version == 1.2.* ]]; then
-		proto=1
-	fi
-
-	go test -timeout 15m -tags conn_pool -v -runssl -proto=$proto -rf=3 -cluster=$(ccm liveset) -clusterSize=$clusterSize -nodesShut=$nodesShut ./... | tee results.txt
-
-	if [ ${PIPESTATUS[0]} -ne 0 ]; then
-		echo "--- FAIL: ccm status follows:"
-		ccm status
-		ccm node1 nodetool status
-		ccm node1 showlog > status.log
-		cat status.log
-		echo "--- FAIL: Received a non-zero exit code from the go test execution, please investigate this"
-		exit 1
-	fi
-	ccm remove
-}
-run_tests $1

+ 26 - 17
policies.go

@@ -52,10 +52,8 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
 //HostSelectionPolicy is an interface for selecting
 //the most appropriate host to execute a given query.
 type HostSelectionPolicy interface {
-	//SetHosts notifies this policy of the current hosts in the cluster
-	SetHosts(hosts []HostInfo)
-	//SetPartitioner notifies this policy of the current token partitioner
-	SetPartitioner(partitioner string)
+	SetHosts
+	SetPartitioner
 	//Pick returns an iteration function over selected hosts
 	Pick(*Query) NextHost
 }
@@ -85,13 +83,17 @@ func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {
 }
 
 func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
-	pos := atomic.AddUint32(&r.pos, 1)
+	// i is used to limit the number of attempts to find a host
+	// to the number of hosts known to this policy
 	var i uint32 = 0
 	return func() *HostInfo {
 		var host *HostInfo
 		r.mu.RLock()
+		// always increment pos to evenly distribute traffic in case of
+		// failures
+		pos := atomic.AddUint32(&r.pos, 1)
 		if len(r.hosts) > 0 && int(i) < len(r.hosts) {
-			host = &r.hosts[(pos+i)%uint32(len(r.hosts))]
+			host = &r.hosts[(pos)%uint32(len(r.hosts))]
 			i++
 		}
 		r.mu.RUnlock()
@@ -127,10 +129,12 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 
-	t.fallback.SetPartitioner(partitioner)
-	t.partitioner = partitioner
+	if t.partitioner != partitioner {
+		t.fallback.SetPartitioner(partitioner)
+		t.partitioner = partitioner
 
-	t.resetTokenRing()
+		t.resetTokenRing()
+	}
 }
 
 func (t *tokenAwareHostPolicy) resetTokenRing() {
@@ -166,18 +170,19 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
 	var host *HostInfo
 
 	t.mu.RLock()
-	if t.tokenRing != nil {
-		host = t.tokenRing.GetHostForPartitionKey(routingKey)
-	}
+	// TODO retrieve a list of hosts based on the replication strategy
+	host = t.tokenRing.GetHostForPartitionKey(routingKey)
 	t.mu.RUnlock()
 
 	if host == nil {
 		return t.fallback.Pick(qry)
 	}
 
-	var hostReturned bool = false
-	var once sync.Once
-	var fallbackIter NextHost
+	// scope these variables for the same lifetime as the iterator function
+	var (
+		hostReturned bool
+		fallbackIter NextHost
+	)
 	return func() *HostInfo {
 		if !hostReturned {
 			hostReturned = true
@@ -185,9 +190,13 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
 		}
 
 		// fallback
-		once.Do(func() { fallbackIter = t.fallback.Pick(qry) })
+		if fallbackIter == nil {
+			fallbackIter = t.fallback.Pick(qry)
+		}
 
 		fallbackHost := fallbackIter()
+
+		// filter the token aware selected hosts from the fallback hosts
 		if fallbackHost == host {
 			fallbackHost = fallbackIter()
 		}
@@ -210,7 +219,7 @@ type roundRobinConnPolicy struct {
 }
 
 func NewRoundRobinConnPolicy() ConnSelectionPolicy {
-	return &roundRobinConnPolicy{conns: []*Conn{}}
+	return &roundRobinConnPolicy{}
 }
 
 func (r *roundRobinConnPolicy) SetConns(conns []*Conn) {

+ 13 - 11
policies_test.go

@@ -17,27 +17,29 @@ func TestRoundRobinHostPolicy(t *testing.T) {
 	policy.SetHosts(hosts)
 
 	// the first host selected is actually at [1], but this is ok for RR
-	iter := policy.Pick(nil)
-	if actual := iter(); actual != &hosts[1] {
-		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
-	}
-	if actual := iter(); actual != &hosts[0] {
+	// interleaved iteration should always increment the host
+	iterA := policy.Pick(nil)
+	if actual := iterA(); actual != &hosts[1] {
 		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
 	}
-	iter = policy.Pick(nil)
-	if actual := iter(); actual != &hosts[0] {
+	iterB := policy.Pick(nil)
+	if actual := iterB(); actual != &hosts[0] {
 		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
 	}
-	if actual := iter(); actual != &hosts[1] {
+	if actual := iterB(); actual != &hosts[1] {
 		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
 	}
-	iter = policy.Pick(nil)
-	if actual := iter(); actual != &hosts[1] {
+	if actual := iterA(); actual != &hosts[0] {
 		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
 	}
-	if actual := iter(); actual != &hosts[0] {
+
+	iterC := policy.Pick(nil)
+	if actual := iterC(); actual != &hosts[1] {
 		t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
 	}
+	if actual := iterC(); actual != &hosts[0] {
+		t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
+	}
 }
 
 func TestTokenAwareHostPolicy(t *testing.T) {

+ 9 - 1
token.go

@@ -301,12 +301,20 @@ func (t *tokenRing) String() string {
 }
 
 func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) *HostInfo {
+	if t == nil {
+		return nil
+	}
+
 	token := t.partitioner.Hash(partitionKey)
 	return t.GetHostForToken(token)
 }
 
 func (t *tokenRing) GetHostForToken(token token) *HostInfo {
-	// find the primary repica
+	if t == nil {
+		return nil
+	}
+
+	// find the primary replica
 	ringIndex := sort.Search(
 		len(t.tokens),
 		func(i int) bool {