浏览代码

Merge remote-tracking branch 'Zariel/cluster-ring-discovery' into cluster-ring-discovery

Ben Hood 11 年之前
父节点
当前提交
1f037cf044
共有 3 个文件被更改,包括 214 次插入34 次删除
  1. 18 20
      cluster.go
  2. 89 14
      connectionpool.go
  3. 107 0
      host_source.go

+ 18 - 20
cluster.go

@@ -6,9 +6,10 @@ package gocql
 
 import (
 	"errors"
-	"github.com/golang/groupcache/lru"
 	"sync"
 	"time"
+
+	"github.com/golang/groupcache/lru"
 )
 
 //Package global reference to Prepared Statements LRU
@@ -29,6 +30,16 @@ func (p *preparedLRU) Max(max int) {
 	p.lru.MaxEntries = max
 }
 
+// To enable periodic node discovery enable DiscoverHosts in ClusterConfig
+type DiscoveryConfig struct {
+	// If not empty will filter all discoverred hosts to a single Data Centre (default: "")
+	DcFilter string
+	// If not empty will filter all discoverred hosts to a single Rack (default: "")
+	RackFilter string
+	// The interval to check for new hosts (default: 30s)
+	Sleep time.Duration
+}
+
 // ClusterConfig is a struct to configure the default cluster implementation
 // of gocoql. It has a varity of attributes that can be used to modify the
 // behavior to fit the most common use cases. Applications that requre a
@@ -50,6 +61,7 @@ type ClusterConfig struct {
 	ConnPoolType     NewPoolFunc   // The function used to create the connection pool for the session (default: NewSimplePool)
 	DiscoverHosts    bool          // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
 	MaxPreparedStmts int           // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
+	Discovery        DiscoveryConfig
 }
 
 // NewCluster generates a new config for the default cluster implementation.
@@ -95,26 +107,13 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 		s.SetConsistency(cfg.Consistency)
 
 		if cfg.DiscoverHosts {
-			//Fill out cfg.Hosts
-			query := "SELECT peer FROM system.peers"
-			peers := s.Query(query).Iter()
-
-			var ip string
-			for peers.Scan(&ip) {
-				exists := false
-				for ii := 0; ii < len(cfg.Hosts); ii++ {
-					if cfg.Hosts[ii] == ip {
-						exists = true
-					}
-				}
-				if !exists {
-					cfg.Hosts = append(cfg.Hosts, ip)
-				}
+			hostSource := &ringDescriber{
+				session:    s,
+				dcFilter:   cfg.Discovery.DcFilter,
+				rackFilter: cfg.Discovery.RackFilter,
 			}
 
-			if err := peers.Close(); err != nil {
-				return s, ErrHostQueryFailed
-			}
+			go hostSource.run(cfg.Discovery.Sleep)
 		}
 
 		return s, nil
@@ -122,7 +121,6 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 
 	pool.Close()
 	return nil, ErrNoConnectionsStarted
-
 }
 
 var (

+ 89 - 14
connectionpool.go

@@ -91,6 +91,7 @@ type ConnectionPool interface {
 	Size() int
 	HandleError(*Conn, error, bool)
 	Close()
+	SetHosts(host []HostInfo)
 }
 
 //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
@@ -105,7 +106,13 @@ type SimplePool struct {
 	connPool map[string]*RoundRobin
 	conns    map[*Conn]struct{}
 	keyspace string
-	mu       sync.Mutex
+
+	hostMu sync.RWMutex
+	// this is the set of current hosts which the pool will attempt to connect to
+	hosts map[string]*HostInfo
+
+	// protects hostpool, connPoll, conns, quit
+	mu sync.Mutex
 
 	cFillingPool chan int
 
@@ -117,7 +124,7 @@ type SimplePool struct {
 //NewSimplePool is the function used by gocql to create the simple connection pool.
 //This is the default if no other pool type is specified.
 func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
-	pool := SimplePool{
+	pool := &SimplePool{
 		cfg:          cfg,
 		hostPool:     NewRoundRobin(),
 		connPool:     make(map[string]*RoundRobin),
@@ -125,7 +132,15 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
 		quitWait:     make(chan bool),
 		cFillingPool: make(chan int, 1),
 		keyspace:     cfg.Keyspace,
+		hosts:        make(map[string]*HostInfo),
+	}
+
+	for _, host := range cfg.Hosts {
+		// seed hosts have unknown topology
+		// TODO: Handle populating this during SetHosts
+		pool.hosts[host] = &HostInfo{Peer: host}
 	}
+
 	//Walk through connecting to hosts. As soon as one host connects
 	//defer the remaining connections to cluster.fillPool()
 	for i := 0; i < len(cfg.Hosts); i++ {
@@ -133,17 +148,19 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
 		if strings.Index(addr, ":") < 0 {
 			addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
 		}
+
 		if pool.connect(addr) == nil {
 			pool.cFillingPool <- 1
 			go pool.fillPool()
 			break
 		}
-
 	}
-	return &pool
+
+	return pool
 }
 
 func (c *SimplePool) connect(addr string) error {
+
 	cfg := ConnConfig{
 		ProtoVersion:  c.cfg.ProtoVersion,
 		CQLVersion:    c.cfg.CQLVersion,
@@ -154,14 +171,13 @@ func (c *SimplePool) connect(addr string) error {
 		Keepalive:     c.cfg.SocketKeepalive,
 	}
 
-	for {
-		conn, err := Connect(addr, cfg, c)
-		if err != nil {
-			log.Printf("failed to connect to %q: %v", addr, err)
-			return err
-		}
-		return c.addConn(conn)
+	conn, err := Connect(addr, cfg, c)
+	if err != nil {
+		log.Printf("connect: failed to connect to %q: %v", addr, err)
+		return err
 	}
+
+	return c.addConn(conn)
 }
 
 func (c *SimplePool) addConn(conn *Conn) error {
@@ -171,6 +187,7 @@ func (c *SimplePool) addConn(conn *Conn) error {
 		conn.Close()
 		return nil
 	}
+
 	//Set the connection's keyspace if any before adding it to the pool
 	if c.keyspace != "" {
 		if err := conn.UseKeyspace(c.keyspace); err != nil {
@@ -179,14 +196,17 @@ func (c *SimplePool) addConn(conn *Conn) error {
 			return err
 		}
 	}
+
 	connPool := c.connPool[conn.Address()]
 	if connPool == nil {
 		connPool = NewRoundRobin()
 		c.connPool[conn.Address()] = connPool
 		c.hostPool.AddNode(connPool)
 	}
+
 	connPool.AddNode(conn)
 	c.conns[conn] = struct{}{}
+
 	return nil
 }
 
@@ -209,13 +229,17 @@ func (c *SimplePool) fillPool() {
 	if isClosed {
 		return
 	}
+
+	c.hostMu.RLock()
+
 	//Walk through list of defined hosts
-	for i := 0; i < len(c.cfg.Hosts); i++ {
-		addr := strings.TrimSpace(c.cfg.Hosts[i])
+	for host := range c.hosts {
+		addr := strings.TrimSpace(host)
 		if strings.Index(addr, ":") < 0 {
 			addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
 		}
-		var numConns int = 1
+
+		numConns := 1
 		//See if the host already has connections in the pool
 		c.mu.Lock()
 		conns, ok := c.connPool[addr]
@@ -233,6 +257,7 @@ func (c *SimplePool) fillPool() {
 				continue
 			}
 		}
+
 		//This is reached if the host is responsive and needs more connections
 		//Create connections for host synchronously to mitigate flooding the host.
 		go func(a string, conns int) {
@@ -241,6 +266,8 @@ func (c *SimplePool) fillPool() {
 			}
 		}(addr, numConns)
 	}
+
+	c.hostMu.RUnlock()
 }
 
 // Should only be called if c.mu is locked
@@ -313,3 +340,51 @@ func (c *SimplePool) Close() {
 		}
 	})
 }
+
+func (c *SimplePool) SetHosts(hosts []HostInfo) {
+	c.hostMu.Lock()
+	toRemove := make(map[string]struct{})
+	for k := range c.hosts {
+		toRemove[k] = struct{}{}
+	}
+
+	for _, host := range hosts {
+		host := host
+
+		delete(toRemove, host.Peer)
+		// we already have it
+		if _, ok := c.hosts[host.Peer]; ok {
+			// TODO: Check rack, dc, token range is consistent, trigger topology change
+			// update stored host
+			continue
+		}
+
+		c.hosts[host.Peer] = &host
+	}
+
+	// can we hold c.mu whilst iterating this loop?
+	for addr := range toRemove {
+		c.removeHostLocked(addr)
+	}
+	c.hostMu.Unlock()
+}
+
+func (c *SimplePool) removeHostLocked(addr string) {
+	if _, ok := c.hosts[addr]; !ok {
+		return
+	}
+	delete(c.hosts, addr)
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.connPool[addr]; !ok {
+		return
+	}
+
+	for conn := range c.conns {
+		if conn.Address() == addr {
+			c.removeConnLocked(conn)
+		}
+	}
+}

+ 107 - 0
host_source.go

@@ -0,0 +1,107 @@
+package gocql
+
+import (
+	"log"
+	"net"
+	"time"
+)
+
+type HostInfo struct {
+	Peer       string
+	DataCenter string
+	Rack       string
+	HostId     string
+	Tokens     []string
+}
+
+// Polls system.peers at a specific interval to find new hosts
+type ringDescriber struct {
+	dcFilter   string
+	rackFilter string
+	previous   []HostInfo
+	session    *Session
+}
+
+func (r *ringDescriber) GetHosts() ([]HostInfo, error) {
+	// we need conn to be the same because we need to query system.peers and system.local
+	// on the same node to get the whole cluster
+	conn := r.session.Pool.Pick(nil)
+	if conn == nil {
+		return r.previous, nil
+	}
+
+	query := r.session.Query("SELECT data_center, rack, host_id, tokens FROM system.local")
+	iter := conn.executeQuery(query)
+
+	host := &HostInfo{}
+	iter.Scan(&host.DataCenter, &host.Rack, &host.HostId, &host.Tokens)
+
+	if err := iter.Close(); err != nil {
+		return nil, err
+	}
+
+	addr, _, err := net.SplitHostPort(conn.Address())
+	if err != nil {
+		// this should not happen, ever, as this is the address that was dialed by conn, here
+		// a panic makes sense, please report a bug if it occurs.
+		panic(err)
+	}
+
+	host.Peer = addr
+
+	hosts := []HostInfo{*host}
+
+	query = r.session.Query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
+	iter = conn.executeQuery(query)
+
+	for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
+		if r.matchFilter(host) {
+			hosts = append(hosts, *host)
+		}
+	}
+
+	if err := iter.Close(); err != nil {
+		return nil, err
+	}
+
+	r.previous = hosts
+
+	return hosts, nil
+}
+
+func (r *ringDescriber) matchFilter(host *HostInfo) bool {
+	if r.dcFilter == "" && r.rackFilter == "" {
+		return true
+	}
+
+	if r.dcFilter != "" && r.dcFilter != host.DataCenter {
+		return false
+	}
+
+	if r.rackFilter != "" && r.rackFilter != host.Rack {
+		return false
+	}
+
+	return true
+}
+
+func (h *ringDescriber) run(sleep time.Duration) {
+	if sleep == 0 {
+		sleep = 30 * time.Second
+	}
+
+	for {
+		// if we have 0 hosts this will return the previous list of hosts to
+		// attempt to reconnect to the cluster otherwise we would never find
+		// downed hosts again, could possibly have an optimisation to only
+		// try to add new hosts if GetHosts didnt error and the hosts didnt change.
+		hosts, err := h.GetHosts()
+		if err != nil {
+			log.Println("RingDescriber: unable to get ring topology:", err)
+		} else {
+			h.session.Pool.SetHosts(hosts)
+		}
+
+		time.Sleep(sleep)
+	}
+}