فهرست منبع

Add periodic node discovery with optional filters

Add support for periodic node discovery via system.peers.
Chris Bannister 11 سال پیش
والد
کامیت
33220784c3
3فایلهای تغییر یافته به همراه188 افزوده شده و 35 حذف شده
  1. 20 21
      cluster.go
  2. 69 14
      connectionpool.go
  3. 99 0
      host_source.go

+ 20 - 21
cluster.go

@@ -6,9 +6,10 @@ package gocql
 
 import (
 	"errors"
-	"github.com/golang/groupcache/lru"
 	"sync"
 	"time"
+
+	"github.com/golang/groupcache/lru"
 )
 
 //Package global reference to Prepared Statements LRU
@@ -29,6 +30,16 @@ func (p *preparedLRU) Max(max int) {
 	p.lru.MaxEntries = max
 }
 
+// To enable periodic node discovery enable DiscoverHosts in ClusterConfig
+type DiscoveryConfig struct {
+	// If not empty will filter all discoverred hosts to a single Data Centre (default: "")
+	DcFilter string
+	// If not empty will filter all discoverred hosts to a single Rack (default: "")
+	RackFilter string
+	// The interval to check for new hosts (default: 30s)
+	Sleep time.Duration
+}
+
 // ClusterConfig is a struct to configure the default cluster implementation
 // of gocoql. It has a varity of attributes that can be used to modify the
 // behavior to fit the most common use cases. Applications that requre a
@@ -50,6 +61,7 @@ type ClusterConfig struct {
 	ConnPoolType     NewPoolFunc   // The function used to create the connection pool for the session (default: NewSimplePool)
 	DiscoverHosts    bool          // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
 	MaxPreparedStmts int           // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
+	Discovery        DiscoveryConfig
 }
 
 // NewCluster generates a new config for the default cluster implementation.
@@ -90,31 +102,19 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 	stmtsLRU.mu.Unlock()
 
 	//See if there are any connections in the pool
-	if pool.Size() > 0 {
+	if pool.Size() > 0 || cfg.DiscoverHosts {
 		s := NewSession(pool, *cfg)
 		s.SetConsistency(cfg.Consistency)
 
 		if cfg.DiscoverHosts {
-			//Fill out cfg.Hosts
-			query := "SELECT peer FROM system.peers"
-			peers := s.Query(query).Iter()
-
-			var ip string
-			for peers.Scan(&ip) {
-				exists := false
-				for ii := 0; ii < len(cfg.Hosts); ii++ {
-					if cfg.Hosts[ii] == ip {
-						exists = true
-					}
-				}
-				if !exists {
-					cfg.Hosts = append(cfg.Hosts, ip)
-				}
+			hostSource := &ringDescriber{
+				session:    s,
+				dcFilter:   cfg.Discovery.DcFilter,
+				rackFilter: cfg.Discovery.RackFilter,
+				previous:   cfg.Hosts,
 			}
 
-			if err := peers.Close(); err != nil {
-				return s, ErrHostQueryFailed
-			}
+			go hostSource.run(cfg.Discovery.Sleep)
 		}
 
 		return s, nil
@@ -122,7 +122,6 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 
 	pool.Close()
 	return nil, ErrNoConnectionsStarted
-
 }
 
 var (

+ 69 - 14
connectionpool.go

@@ -91,6 +91,8 @@ type ConnectionPool interface {
 	Size() int
 	HandleError(*Conn, error, bool)
 	Close()
+	AddHost(addr string)
+	RemoveHost(addr string)
 }
 
 //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
@@ -105,7 +107,12 @@ type SimplePool struct {
 	connPool map[string]*RoundRobin
 	conns    map[*Conn]struct{}
 	keyspace string
-	mu       sync.Mutex
+	// current hosts
+	hostMu sync.RWMutex
+	hosts  map[string]struct{}
+
+	// protects hostpool, connPoll, conns, quit
+	mu sync.Mutex
 
 	cFillingPool chan int
 
@@ -117,7 +124,7 @@ type SimplePool struct {
 //NewSimplePool is the function used by gocql to create the simple connection pool.
 //This is the default if no other pool type is specified.
 func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
-	pool := SimplePool{
+	pool := &SimplePool{
 		cfg:          cfg,
 		hostPool:     NewRoundRobin(),
 		connPool:     make(map[string]*RoundRobin),
@@ -125,7 +132,13 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
 		quitWait:     make(chan bool),
 		cFillingPool: make(chan int, 1),
 		keyspace:     cfg.Keyspace,
+		hosts:        make(map[string]struct{}),
+	}
+
+	for _, host := range cfg.Hosts {
+		pool.hosts[host] = struct{}{}
 	}
+
 	//Walk through connecting to hosts. As soon as one host connects
 	//defer the remaining connections to cluster.fillPool()
 	for i := 0; i < len(cfg.Hosts); i++ {
@@ -133,14 +146,15 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
 		if strings.Index(addr, ":") < 0 {
 			addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
 		}
+
 		if pool.connect(addr) == nil {
 			pool.cFillingPool <- 1
 			go pool.fillPool()
 			break
 		}
-
 	}
-	return &pool
+
+	return pool
 }
 
 func (c *SimplePool) connect(addr string) error {
@@ -154,14 +168,12 @@ func (c *SimplePool) connect(addr string) error {
 		Keepalive:     c.cfg.SocketKeepalive,
 	}
 
-	for {
-		conn, err := Connect(addr, cfg, c)
-		if err != nil {
-			log.Printf("failed to connect to %q: %v", addr, err)
-			return err
-		}
-		return c.addConn(conn)
+	conn, err := Connect(addr, cfg, c)
+	if err != nil {
+		log.Printf("failed to connect to %q: %v", addr, err)
+		return err
 	}
+	return c.addConn(conn)
 }
 
 func (c *SimplePool) addConn(conn *Conn) error {
@@ -171,6 +183,7 @@ func (c *SimplePool) addConn(conn *Conn) error {
 		conn.Close()
 		return nil
 	}
+
 	//Set the connection's keyspace if any before adding it to the pool
 	if c.keyspace != "" {
 		if err := conn.UseKeyspace(c.keyspace); err != nil {
@@ -179,14 +192,17 @@ func (c *SimplePool) addConn(conn *Conn) error {
 			return err
 		}
 	}
+
 	connPool := c.connPool[conn.Address()]
 	if connPool == nil {
 		connPool = NewRoundRobin()
 		c.connPool[conn.Address()] = connPool
 		c.hostPool.AddNode(connPool)
 	}
+
 	connPool.AddNode(conn)
 	c.conns[conn] = struct{}{}
+
 	return nil
 }
 
@@ -209,13 +225,17 @@ func (c *SimplePool) fillPool() {
 	if isClosed {
 		return
 	}
+
+	c.hostMu.RLock()
+
 	//Walk through list of defined hosts
-	for i := 0; i < len(c.cfg.Hosts); i++ {
-		addr := strings.TrimSpace(c.cfg.Hosts[i])
+	for host := range c.hosts {
+		addr := strings.TrimSpace(host)
 		if strings.Index(addr, ":") < 0 {
 			addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
 		}
-		var numConns int = 1
+
+		numConns := 1
 		//See if the host already has connections in the pool
 		c.mu.Lock()
 		conns, ok := c.connPool[addr]
@@ -233,6 +253,7 @@ func (c *SimplePool) fillPool() {
 				continue
 			}
 		}
+
 		//This is reached if the host is responsive and needs more connections
 		//Create connections for host synchronously to mitigate flooding the host.
 		go func(a string, conns int) {
@@ -241,6 +262,8 @@ func (c *SimplePool) fillPool() {
 			}
 		}(addr, numConns)
 	}
+
+	c.hostMu.RUnlock()
 }
 
 // Should only be called if c.mu is locked
@@ -313,3 +336,35 @@ func (c *SimplePool) Close() {
 		}
 	})
 }
+
+func (c *SimplePool) AddHost(addr string) {
+	c.hostMu.Lock()
+	if _, ok := c.hosts[addr]; !ok {
+		c.hosts[addr] = struct{}{}
+		go c.fillPool()
+	}
+	c.hostMu.Unlock()
+}
+
+func (c *SimplePool) RemoveHost(addr string) {
+	c.hostMu.Lock()
+	if _, ok := c.hosts[addr]; !ok {
+		c.hostMu.Unlock()
+		return
+	}
+	delete(c.hosts, addr)
+	c.hostMu.Unlock()
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.connPool[addr]; !ok {
+		return
+	}
+
+	for conn := range c.conns {
+		if conn.Address() == addr {
+			c.removeConnLocked(conn)
+		}
+	}
+}

+ 99 - 0
host_source.go

@@ -0,0 +1,99 @@
+package gocql
+
+import "time"
+
+type hostInfo struct {
+	peer       string
+	dataCenter string
+	rack       string
+	hostId     string
+	tokens     []string
+}
+
+// Polls system.peers at a specific interval to find new hosts
+type ringDescriber struct {
+	dcFilter   string
+	rackFilter string
+	previous   []string
+	session    *Session
+}
+
+func (r *ringDescriber) GetHosts() []string {
+	// we need conn to be the same because we need to query system.peers and system.local
+	// on the same node to get the whole cluster
+	conn := r.session.Pool.Pick(nil)
+	if conn == nil {
+		return r.previous
+	}
+
+	// TODO: Get conn's tokens form system.local
+	query := r.session.Query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
+	iter := conn.executeQuery(query)
+
+	hosts := []string{conn.Address()}
+	host := hostInfo{}
+
+	for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens) {
+		if r.matchFilter(&host) {
+			// TODO: Capture tokens
+			hosts = append(hosts, host.peer)
+		}
+	}
+
+	if err := iter.Close(); err != nil {
+		return r.previous
+	}
+
+	r.previous = hosts
+
+	return hosts
+}
+
+func (r *ringDescriber) matchFilter(host *hostInfo) bool {
+	if r.dcFilter == "" && r.rackFilter == "" {
+		return true
+	}
+
+	if r.dcFilter != "" && r.dcFilter != host.dataCenter {
+		return false
+	}
+
+	if r.rackFilter != "" && r.rackFilter != host.rack {
+		return false
+	}
+
+	return true
+}
+
+func (h *ringDescriber) run(sleep time.Duration) {
+	if sleep == 0 {
+		sleep = 30 * time.Second
+	}
+
+	prev := make(map[string]struct{})
+	for {
+		// if we have 0 hosts this will return the previous list of hosts to
+		// attempt to reconnect to the cluster otherwise we would never find
+		// downed hosts again, could possibly have an optimisation to only
+		// try to add new hosts if GetHosts didnt error and the hosts didnt change.
+		hosts := h.GetHosts()
+		current := make(map[string]struct{})
+		for _, host := range hosts {
+			if _, ok := prev[host]; !ok {
+				h.session.Pool.AddHost(host)
+			} else {
+				delete(prev, host)
+			}
+
+			current[host] = struct{}{}
+		}
+
+		for host := range prev {
+			h.session.Pool.RemoveHost(host)
+		}
+
+		prev = current
+
+		time.Sleep(sleep)
+	}
+}