ソースを参照

events: sleep after node up / new node events

After 2.2 we dont need to sleep for new nodes, but before then we should
sleep for around 10 seconds to give enough time for the binary server to
come up.
Chris Bannister 10 年 前
コミット
afd9ba0177
3 ファイル変更21 行追加4 行削除
  1. 8 0
      events.go
  2. 2 1
      events_ccm_test.go
  3. 11 3
      host_source.go

+ 8 - 0
events.go

@@ -169,6 +169,10 @@ func (s *Session) handleNewNode(host net.IP, port int) {
 		hostInfo = &HostInfo{peer: host.String(), port: port, state: NodeUp}
 		hostInfo = &HostInfo{peer: host.String(), port: port, state: NodeUp}
 	}
 	}
 
 
+	if t := hostInfo.Version().nodeUpDelay(); t > 0 {
+		time.Sleep(t)
+	}
+
 	// should this handle token moving?
 	// should this handle token moving?
 	if existing, ok := s.ring.addHostIfMissing(hostInfo); !ok {
 	if existing, ok := s.ring.addHostIfMissing(hostInfo); !ok {
 		existing.update(hostInfo)
 		existing.update(hostInfo)
@@ -195,6 +199,10 @@ func (s *Session) handleNodeUp(ip net.IP, port int) {
 	addr := ip.String()
 	addr := ip.String()
 	host := s.ring.getHost(addr)
 	host := s.ring.getHost(addr)
 	if host != nil {
 	if host != nil {
+		if t := host.Version().nodeUpDelay(); t > 0 {
+			time.Sleep(t)
+		}
+
 		host.setState(NodeUp)
 		host.setState(NodeUp)
 		s.pool.hostUp(host)
 		s.pool.hostUp(host)
 		return
 		return

+ 2 - 1
events_ccm_test.go

@@ -155,7 +155,8 @@ func TestEventNodeUp(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	time.Sleep(5 * time.Second)
+	// cassandra < 2.2 needs 10 seconds to start up the binary service
+	time.Sleep(10 * time.Second)
 
 
 	session.pool.mu.RLock()
 	session.pool.mu.RLock()
 	log.Printf("poolhosts=%+v\n", poolHosts)
 	log.Printf("poolhosts=%+v\n", poolHosts)

+ 11 - 3
host_source.go

@@ -6,6 +6,7 @@ import (
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
+	"time"
 )
 )
 
 
 type nodeState int32
 type nodeState int32
@@ -57,6 +58,15 @@ func (c cassVersion) String() string {
 	return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
 	return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
 }
 }
 
 
+func (c cassVersion) nodeUpDelay() time.Duration {
+	if c.Major >= 2 && c.Minor >= 2 {
+		// CASSANDRA-8236
+		return 0
+	}
+
+	return 10 * time.Second
+}
+
 type HostInfo struct {
 type HostInfo struct {
 	// TODO(zariel): reduce locking maybe, not all values will change, but to ensure
 	// TODO(zariel): reduce locking maybe, not all values will change, but to ensure
 	// that we are thread safe use a mutex to access all fields.
 	// that we are thread safe use a mutex to access all fields.
@@ -141,9 +151,7 @@ func (h *HostInfo) Version() cassVersion {
 func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
 func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
 	h.mu.Lock()
 	h.mu.Lock()
 	defer h.mu.Unlock()
 	defer h.mu.Unlock()
-	h.version.Major = major
-	h.version.Minor = minor
-	h.version.Patch = patch
+	h.version = cassVersion{major, minor, patch}
 	return h
 	return h
 }
 }