Просмотр исходного кода

Merge pull request #225 from relops/cluster-ring-discovery

Cluster ring discovery
Ben Hood 11 лет назад
Родитель
Сommit
dd09ddf816
7 измененных файлов с 253 добавлено и 40 удалено
  1. 1 0
      README.md
  2. 33 3
      cassandra_test.go
  3. 18 20
      cluster.go
  4. 92 14
      connectionpool.go
  5. 104 0
      host_source.go
  6. 4 2
      integration.sh
  7. 1 1
      wiki_test.go

+ 1 - 0
README.md

@@ -43,6 +43,7 @@ Features
   * Round robin distribution of queries to different connections on a host
   * Each connection can execute up to 128 concurrent queries
   * Optional automatic discovery of nodes
+  * Optional support for periodic node discovery via system.peers
 * Iteration over paged results with configurable page size
 * Optional frame compression (using snappy)
 * Automatic query preparation

+ 33 - 3
cassandra_test.go

@@ -27,8 +27,9 @@ var (
 	flagProto    = flag.Int("proto", 2, "protcol version")
 	flagCQL      = flag.String("cql", "3.0.0", "CQL version")
 	flagRF       = flag.Int("rf", 1, "replication factor for test keyspace")
+	clusterSize  = flag.Int("clusterSize", 1, "the expected size of the cluster")
 	flagRetry    = flag.Int("retries", 5, "number of times to retry queries")
-	clusterSize  = 1
+	flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
 	clusterHosts []string
 )
 
@@ -36,7 +37,6 @@ func init() {
 
 	flag.Parse()
 	clusterHosts = strings.Split(*flagCluster, ",")
-	clusterSize = len(clusterHosts)
 	log.SetFlags(log.Lshortfile | log.LstdFlags)
 }
 
@@ -44,7 +44,7 @@ var initOnce sync.Once
 
 func createTable(s *Session, table string) error {
 	err := s.Query(table).Consistency(All).Exec()
-	if clusterSize > 1 {
+	if *clusterSize > 1 {
 		// wait for table definition to propogate
 		time.Sleep(250 * time.Millisecond)
 	}
@@ -99,6 +99,36 @@ func createSession(tb testing.TB) *Session {
 	return session
 }
 
+//TestRingDiscovery makes sure that you can autodiscover other cluster members when you seed a cluster config with just one node
+func TestRingDiscovery(t *testing.T) {
+
+	cluster := NewCluster(clusterHosts[0])
+	cluster.ProtoVersion = *flagProto
+	cluster.CQLVersion = *flagCQL
+	cluster.Timeout = 5 * time.Second
+	cluster.Consistency = Quorum
+	cluster.RetryPolicy.NumRetries = *flagRetry
+	cluster.DiscoverHosts = true
+
+	session, err := cluster.CreateSession()
+	if err != nil {
+		t.Errorf("got error connecting to the cluster %v", err)
+	}
+
+	if *clusterSize > 1 {
+		// wait for autodiscovery to update the pool with the list of known hosts
+		time.Sleep(*flagAutoWait)
+	}
+
+	size := len(session.Pool.(*SimplePool).connPool)
+
+	if *clusterSize != size {
+		t.Fatalf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
+	}
+
+	session.Close()
+}
+
 func TestEmptyHosts(t *testing.T) {
 	cluster := NewCluster()
 	if session, err := cluster.CreateSession(); err == nil {

+ 18 - 20
cluster.go

@@ -6,9 +6,10 @@ package gocql
 
 import (
 	"errors"
-	"github.com/golang/groupcache/lru"
 	"sync"
 	"time"
+
+	"github.com/golang/groupcache/lru"
 )
 
 //Package global reference to Prepared Statements LRU
@@ -29,6 +30,16 @@ func (p *preparedLRU) Max(max int) {
 	p.lru.MaxEntries = max
 }
 
+// To enable periodic node discovery enable DiscoverHosts in ClusterConfig
+type DiscoveryConfig struct {
+	// If not empty will filter all discoverred hosts to a single Data Centre (default: "")
+	DcFilter string
+	// If not empty will filter all discoverred hosts to a single Rack (default: "")
+	RackFilter string
+	// The interval to check for new hosts (default: 30s)
+	Sleep time.Duration
+}
+
 // ClusterConfig is a struct to configure the default cluster implementation
 // of gocoql. It has a varity of attributes that can be used to modify the
 // behavior to fit the most common use cases. Applications that requre a
@@ -50,6 +61,7 @@ type ClusterConfig struct {
 	ConnPoolType     NewPoolFunc   // The function used to create the connection pool for the session (default: NewSimplePool)
 	DiscoverHosts    bool          // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
 	MaxPreparedStmts int           // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
+	Discovery        DiscoveryConfig
 }
 
 // NewCluster generates a new config for the default cluster implementation.
@@ -95,26 +107,13 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 		s.SetConsistency(cfg.Consistency)
 
 		if cfg.DiscoverHosts {
-			//Fill out cfg.Hosts
-			query := "SELECT peer FROM system.peers"
-			peers := s.Query(query).Iter()
-
-			var ip string
-			for peers.Scan(&ip) {
-				exists := false
-				for ii := 0; ii < len(cfg.Hosts); ii++ {
-					if cfg.Hosts[ii] == ip {
-						exists = true
-					}
-				}
-				if !exists {
-					cfg.Hosts = append(cfg.Hosts, ip)
-				}
+			hostSource := &ringDescriber{
+				session:    s,
+				dcFilter:   cfg.Discovery.DcFilter,
+				rackFilter: cfg.Discovery.RackFilter,
 			}
 
-			if err := peers.Close(); err != nil {
-				return s, ErrHostQueryFailed
-			}
+			go hostSource.run(cfg.Discovery.Sleep)
 		}
 
 		return s, nil
@@ -122,7 +121,6 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 
 	pool.Close()
 	return nil, ErrNoConnectionsStarted
-
 }
 
 var (

+ 92 - 14
connectionpool.go

@@ -91,6 +91,7 @@ type ConnectionPool interface {
 	Size() int
 	HandleError(*Conn, error, bool)
 	Close()
+	SetHosts(host []HostInfo)
 }
 
 //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
@@ -105,7 +106,13 @@ type SimplePool struct {
 	connPool map[string]*RoundRobin
 	conns    map[*Conn]struct{}
 	keyspace string
-	mu       sync.Mutex
+
+	hostMu sync.RWMutex
+	// this is the set of current hosts which the pool will attempt to connect to
+	hosts map[string]*HostInfo
+
+	// protects hostpool, connPoll, conns, quit
+	mu sync.Mutex
 
 	cFillingPool chan int
 
@@ -117,7 +124,7 @@ type SimplePool struct {
 //NewSimplePool is the function used by gocql to create the simple connection pool.
 //This is the default if no other pool type is specified.
 func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
-	pool := SimplePool{
+	pool := &SimplePool{
 		cfg:          cfg,
 		hostPool:     NewRoundRobin(),
 		connPool:     make(map[string]*RoundRobin),
@@ -125,7 +132,15 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
 		quitWait:     make(chan bool),
 		cFillingPool: make(chan int, 1),
 		keyspace:     cfg.Keyspace,
+		hosts:        make(map[string]*HostInfo),
+	}
+
+	for _, host := range cfg.Hosts {
+		// seed hosts have unknown topology
+		// TODO: Handle populating this during SetHosts
+		pool.hosts[host] = &HostInfo{Peer: host}
 	}
+
 	//Walk through connecting to hosts. As soon as one host connects
 	//defer the remaining connections to cluster.fillPool()
 	for i := 0; i < len(cfg.Hosts); i++ {
@@ -133,17 +148,19 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
 		if strings.Index(addr, ":") < 0 {
 			addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
 		}
+
 		if pool.connect(addr) == nil {
 			pool.cFillingPool <- 1
 			go pool.fillPool()
 			break
 		}
-
 	}
-	return &pool
+
+	return pool
 }
 
 func (c *SimplePool) connect(addr string) error {
+
 	cfg := ConnConfig{
 		ProtoVersion:  c.cfg.ProtoVersion,
 		CQLVersion:    c.cfg.CQLVersion,
@@ -154,14 +171,13 @@ func (c *SimplePool) connect(addr string) error {
 		Keepalive:     c.cfg.SocketKeepalive,
 	}
 
-	for {
-		conn, err := Connect(addr, cfg, c)
-		if err != nil {
-			log.Printf("failed to connect to %q: %v", addr, err)
-			return err
-		}
-		return c.addConn(conn)
+	conn, err := Connect(addr, cfg, c)
+	if err != nil {
+		log.Printf("connect: failed to connect to %q: %v", addr, err)
+		return err
 	}
+
+	return c.addConn(conn)
 }
 
 func (c *SimplePool) addConn(conn *Conn) error {
@@ -171,6 +187,7 @@ func (c *SimplePool) addConn(conn *Conn) error {
 		conn.Close()
 		return nil
 	}
+
 	//Set the connection's keyspace if any before adding it to the pool
 	if c.keyspace != "" {
 		if err := conn.UseKeyspace(c.keyspace); err != nil {
@@ -179,14 +196,17 @@ func (c *SimplePool) addConn(conn *Conn) error {
 			return err
 		}
 	}
+
 	connPool := c.connPool[conn.Address()]
 	if connPool == nil {
 		connPool = NewRoundRobin()
 		c.connPool[conn.Address()] = connPool
 		c.hostPool.AddNode(connPool)
 	}
+
 	connPool.AddNode(conn)
 	c.conns[conn] = struct{}{}
+
 	return nil
 }
 
@@ -209,13 +229,17 @@ func (c *SimplePool) fillPool() {
 	if isClosed {
 		return
 	}
+
+	c.hostMu.RLock()
+
 	//Walk through list of defined hosts
-	for i := 0; i < len(c.cfg.Hosts); i++ {
-		addr := strings.TrimSpace(c.cfg.Hosts[i])
+	for host := range c.hosts {
+		addr := strings.TrimSpace(host)
 		if strings.Index(addr, ":") < 0 {
 			addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
 		}
-		var numConns int = 1
+
+		numConns := 1
 		//See if the host already has connections in the pool
 		c.mu.Lock()
 		conns, ok := c.connPool[addr]
@@ -233,6 +257,7 @@ func (c *SimplePool) fillPool() {
 				continue
 			}
 		}
+
 		//This is reached if the host is responsive and needs more connections
 		//Create connections for host synchronously to mitigate flooding the host.
 		go func(a string, conns int) {
@@ -241,6 +266,8 @@ func (c *SimplePool) fillPool() {
 			}
 		}(addr, numConns)
 	}
+
+	c.hostMu.RUnlock()
 }
 
 // Should only be called if c.mu is locked
@@ -313,3 +340,54 @@ func (c *SimplePool) Close() {
 		}
 	})
 }
+
+func (c *SimplePool) SetHosts(hosts []HostInfo) {
+
+	c.hostMu.Lock()
+	toRemove := make(map[string]struct{})
+	for k := range c.hosts {
+		toRemove[k] = struct{}{}
+	}
+
+	for _, host := range hosts {
+		host := host
+
+		delete(toRemove, host.Peer)
+		// we already have it
+		if _, ok := c.hosts[host.Peer]; ok {
+			// TODO: Check rack, dc, token range is consistent, trigger topology change
+			// update stored host
+			continue
+		}
+
+		c.hosts[host.Peer] = &host
+	}
+
+	// can we hold c.mu whilst iterating this loop?
+	for addr := range toRemove {
+		c.removeHostLocked(addr)
+	}
+	c.hostMu.Unlock()
+
+	c.fillPool()
+}
+
+func (c *SimplePool) removeHostLocked(addr string) {
+	if _, ok := c.hosts[addr]; !ok {
+		return
+	}
+	delete(c.hosts, addr)
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.connPool[addr]; !ok {
+		return
+	}
+
+	for conn := range c.conns {
+		if conn.Address() == addr {
+			c.removeConnLocked(conn)
+		}
+	}
+}

+ 104 - 0
host_source.go

@@ -0,0 +1,104 @@
+package gocql
+
+import (
+	"log"
+	"net"
+	"time"
+)
+
+type HostInfo struct {
+	Peer       string
+	DataCenter string
+	Rack       string
+	HostId     string
+	Tokens     []string
+}
+
+// Polls system.peers at a specific interval to find new hosts
+type ringDescriber struct {
+	dcFilter   string
+	rackFilter string
+	previous   []HostInfo
+	session    *Session
+}
+
+func (r *ringDescriber) GetHosts() ([]HostInfo, error) {
+	// we need conn to be the same because we need to query system.peers and system.local
+	// on the same node to get the whole cluster
+	conn := r.session.Pool.Pick(nil)
+	if conn == nil {
+		return r.previous, nil
+	}
+
+	query := r.session.Query("SELECT data_center, rack, host_id, tokens FROM system.local")
+	iter := conn.executeQuery(query)
+
+	host := &HostInfo{}
+	iter.Scan(&host.DataCenter, &host.Rack, &host.HostId, &host.Tokens)
+
+	if err := iter.Close(); err != nil {
+		return nil, err
+	}
+
+	addr, _, err := net.SplitHostPort(conn.Address())
+	if err != nil {
+		// this should not happen, ever, as this is the address that was dialed by conn, here
+		// a panic makes sense, please report a bug if it occurs.
+		panic(err)
+	}
+
+	host.Peer = addr
+
+	hosts := []HostInfo{*host}
+
+	query = r.session.Query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
+	iter = conn.executeQuery(query)
+
+	for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
+		if r.matchFilter(host) {
+			hosts = append(hosts, *host)
+		}
+	}
+
+	if err := iter.Close(); err != nil {
+		return nil, err
+	}
+
+	r.previous = hosts
+
+	return hosts, nil
+}
+
+func (r *ringDescriber) matchFilter(host *HostInfo) bool {
+
+	if r.dcFilter != "" && r.dcFilter != host.DataCenter {
+		return false
+	}
+
+	if r.rackFilter != "" && r.rackFilter != host.Rack {
+		return false
+	}
+
+	return true
+}
+
+func (h *ringDescriber) run(sleep time.Duration) {
+	if sleep == 0 {
+		sleep = 30 * time.Second
+	}
+
+	for {
+		// if we have 0 hosts this will return the previous list of hosts to
+		// attempt to reconnect to the cluster otherwise we would never find
+		// downed hosts again, could possibly have an optimisation to only
+		// try to add new hosts if GetHosts didnt error and the hosts didnt change.
+		hosts, err := h.GetHosts()
+		if err != nil {
+			log.Println("RingDescriber: unable to get ring topology:", err)
+		} else {
+			h.session.Pool.SetHosts(hosts)
+		}
+
+		time.Sleep(sleep)
+	}
+}

+ 4 - 2
integration.sh

@@ -3,8 +3,10 @@
 set -e
 
 function run_tests() {
+	local clusterSize=3
 	local version=$1
-	ccm create test -v binary:$version -n 3 -d --vnodes
+
+	ccm create test -v binary:$version -n $clusterSize -d --vnodes
 	ccm updateconf 'concurrent_reads: 8' 'concurrent_writes: 32' 'rpc_server_type: sync' 'rpc_min_threads: 2' 'rpc_max_threads: 8' 'write_request_timeout_in_ms: 5000' 'read_request_timeout_in_ms: 5000'
 	ccm start
 	ccm status
@@ -14,7 +16,7 @@ function run_tests() {
 		proto=1
 	fi
 
-	go test -v -proto=$proto -rf=3 -cluster=$(ccm liveset) ./...
+	go test -v -proto=$proto -rf=3 -cluster=$(ccm liveset) -clusterSize=$clusterSize -autowait=2000ms ./...
 
 	ccm clear
 }

+ 1 - 1
wiki_test.go

@@ -70,7 +70,7 @@ func (w *WikiTest) CreateSchema() {
 			attachments map<varchar, blob>,
 			PRIMARY KEY (title, revid)
 		)`)
-	if clusterSize > 1 {
+	if *clusterSize > 1 {
 		// wait for table definition to propogate
 		time.Sleep(250 * time.Millisecond)
 	}