Pārlūkot izejas kodu

add test suite for events using ccm

Chris Bannister 10 gadi atpakaļ
vecāks
revīzija
8bfcce00d0
5 mainītis faili ar 469 papildinājumiem un 115 dzēšanām
  1. 0 115
      cassandra_test.go
  2. 178 0
      ccm_test/ccm.go
  3. 46 0
      ccm_test/ccm_test.go
  4. 125 0
      common_test.go
  5. 120 0
      events_test.go

+ 0 - 115
cassandra_test.go

@@ -4,10 +4,7 @@ package gocql
 
 import (
 	"bytes"
-	"flag"
-	"fmt"
 	"io"
-	"log"
 	"math"
 	"math/big"
 	"net"
@@ -22,118 +19,6 @@ import (
 	"gopkg.in/inf.v0"
 )
 
-var (
-	flagCluster      = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
-	flagProto        = flag.Int("proto", 2, "protcol version")
-	flagCQL          = flag.String("cql", "3.0.0", "CQL version")
-	flagRF           = flag.Int("rf", 1, "replication factor for test keyspace")
-	clusterSize      = flag.Int("clusterSize", 1, "the expected size of the cluster")
-	flagRetry        = flag.Int("retries", 5, "number of times to retry queries")
-	flagAutoWait     = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
-	flagRunSslTest   = flag.Bool("runssl", false, "Set to true to run ssl test")
-	flagRunAuthTest  = flag.Bool("runauth", false, "Set to true to run authentication test")
-	flagCompressTest = flag.String("compressor", "", "compressor to use")
-	flagTimeout      = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
-	clusterHosts     []string
-)
-
-func init() {
-	flag.Parse()
-	clusterHosts = strings.Split(*flagCluster, ",")
-	log.SetFlags(log.Lshortfile | log.LstdFlags)
-}
-
-func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
-	if *flagRunSslTest {
-		cluster.SslOpts = &SslOptions{
-			CertPath:               "testdata/pki/gocql.crt",
-			KeyPath:                "testdata/pki/gocql.key",
-			CaPath:                 "testdata/pki/ca.crt",
-			EnableHostVerification: false,
-		}
-	}
-	return cluster
-}
-
-var initOnce sync.Once
-
-func createTable(s *Session, table string) error {
-	if err := s.control.query(table).Close(); err != nil {
-		return err
-	}
-
-	return nil
-}
-
-func createCluster() *ClusterConfig {
-	cluster := NewCluster(clusterHosts...)
-	cluster.ProtoVersion = *flagProto
-	cluster.CQLVersion = *flagCQL
-	cluster.Timeout = *flagTimeout
-	cluster.Consistency = Quorum
-	cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
-	if *flagRetry > 0 {
-		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
-	}
-
-	switch *flagCompressTest {
-	case "snappy":
-		cluster.Compressor = &SnappyCompressor{}
-	case "":
-	default:
-		panic("invalid compressor: " + *flagCompressTest)
-	}
-
-	cluster = addSslOptions(cluster)
-	return cluster
-}
-
-func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
-	c := *cluster
-	c.Keyspace = "system"
-	c.Timeout = 20 * time.Second
-	session, err := c.CreateSession()
-	if err != nil {
-		tb.Fatal("createSession:", err)
-	}
-
-	err = session.control.query(`DROP KEYSPACE IF EXISTS ` + keyspace).Close()
-	if err != nil {
-		tb.Fatal(err)
-	}
-
-	err = session.control.query(fmt.Sprintf(`CREATE KEYSPACE %s
-	WITH replication = {
-		'class' : 'SimpleStrategy',
-		'replication_factor' : %d
-	}`, keyspace, *flagRF)).Close()
-
-	if err != nil {
-		tb.Fatal(err)
-	}
-}
-
-func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
-	// Drop and re-create the keyspace once. Different tests should use their own
-	// individual tables, but can assume that the table does not exist before.
-	initOnce.Do(func() {
-		createKeyspace(tb, cluster, "gocql_test")
-	})
-
-	cluster.Keyspace = "gocql_test"
-	session, err := cluster.CreateSession()
-	if err != nil {
-		tb.Fatal("createSession:", err)
-	}
-
-	return session
-}
-
-func createSession(tb testing.TB) *Session {
-	cluster := createCluster()
-	return createSessionFromCluster(cluster, tb)
-}
-
 // TestAuthentication verifies that gocql will work with a host configured to only accept authenticated connections
 func TestAuthentication(t *testing.T) {
 

+ 178 - 0
ccm_test/ccm.go

@@ -0,0 +1,178 @@
+package ccm
+
+import (
+	"bufio"
+	"bytes"
+	"errors"
+	"fmt"
+	"os/exec"
+	"strings"
+)
+
+func execCmd(args ...string) (*bytes.Buffer, error) {
+	cmd := exec.Command("ccm", args...)
+	stdout := &bytes.Buffer{}
+	cmd.Stdout = stdout
+	cmd.Stderr = &bytes.Buffer{}
+	if err := cmd.Run(); err != nil {
+		return nil, errors.New(cmd.Stderr.(*bytes.Buffer).String())
+	}
+
+	return stdout, nil
+}
+
+func AllUp() error {
+	status, err := Status()
+	if err != nil {
+		return err
+	}
+
+	for _, host := range status {
+		if !host.State.IsUp() {
+			if err := NodeUp(host.Name); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+func NodeUp(node string) error {
+	_, err := execCmd(node, "start", "--wait-for-binary-proto", "--wait-other-notice")
+	return err
+}
+
+func NodeDown(node string) error {
+	_, err := execCmd(node, "stop")
+	return err
+}
+
+type Host struct {
+	State NodeState
+	Addr  string
+	Name  string
+}
+
+type NodeState int
+
+func (n NodeState) String() string {
+	if n == NodeStateUp {
+		return "UP"
+	} else if n == NodeStateDown {
+		return "DOWN"
+	} else {
+		return fmt.Sprintf("UNKNOWN_STATE_%d", n)
+	}
+}
+
+func (n NodeState) IsUp() bool {
+	return n == NodeStateUp
+}
+
+const (
+	NodeStateUp NodeState = iota
+	NodeStateDown
+)
+
+func Status() (map[string]Host, error) {
+	// TODO: parse into struct o maniuplate
+	out, err := execCmd("status", "-v")
+	if err != nil {
+		return nil, err
+	}
+
+	const (
+		stateCluster = iota
+		stateCommas
+		stateNode
+		stateOption
+	)
+
+	nodes := make(map[string]Host)
+	// didnt really want to write a full state machine parser
+	state := stateCluster
+	sc := bufio.NewScanner(out)
+
+	var host Host
+
+	for sc.Scan() {
+		switch state {
+		case stateCluster:
+			text := sc.Text()
+			if !strings.HasPrefix(text, "Cluster:") {
+				return nil, fmt.Errorf("expected 'Cluster:' got %q", text)
+			}
+			state = stateCommas
+		case stateCommas:
+			text := sc.Text()
+			if text != "----------------" {
+				return nil, fmt.Errorf("expected '----------------' got %q", text)
+			}
+			state = stateNode
+		case stateNode:
+			// assume nodes start with node
+			text := sc.Text()
+			if !strings.HasPrefix(text, "node") {
+				return nil, fmt.Errorf("expected 'node' got %q", text)
+			}
+			line := strings.Split(text, ":")
+			host.Name = line[0]
+
+			nodeState := strings.TrimSpace(line[1])
+			switch nodeState {
+			case "UP":
+				host.State = NodeStateUp
+			case "DOWN":
+				host.State = NodeStateDown
+			default:
+				return nil, fmt.Errorf("unknown node state from ccm: %q", nodeState)
+			}
+
+			state = stateOption
+		case stateOption:
+			text := sc.Text()
+			if text == "" {
+				state = stateNode
+				nodes[host.Name] = host
+				host = Host{}
+				continue
+			}
+
+			line := strings.Split(strings.TrimSpace(text), "=")
+			k, v := line[0], line[1]
+			if k == "binary" {
+				// could check errors
+				// ('127.0.0.1', 9042)
+				v = v[2:] // (''
+				if i := strings.IndexByte(v, '\''); i < 0 {
+					return nil, fmt.Errorf("invalid binary v=%q", v)
+				} else {
+					host.Addr = v[:i]
+					// dont need port
+				}
+				// 	v = v[i+1:]
+				// }
+
+				// v = v[2:] // ,
+				// if i := strings.IndexByte(v, ')'); i < 0 {
+				// 	return nil, fmt.Errorf("invalid binary v=%q", v)
+				// } else {
+				// 	port, err = strconv.Atoi(v[:i])
+				// 	if err != nil {
+				// 		return nil, err
+				// 	}
+				// }
+				// host.Addr = fmt.Sprintf("%s:%d", addr, port)
+			}
+		default:
+			return nil, fmt.Errorf("unexpected state: %q", state)
+		}
+	}
+
+	if err := sc.Err(); err != nil {
+		return nil, fmt.Errorf("unable to parse ccm status: %v", err)
+	}
+
+	return nodes, nil
+}

+ 46 - 0
ccm_test/ccm_test.go

@@ -0,0 +1,46 @@
+package ccm
+
+import (
+	"testing"
+)
+
+func TestCCM(t *testing.T) {
+	if err := AllUp(); err != nil {
+		t.Fatal(err)
+	}
+
+	status, err := Status()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if host, ok := status["node1"]; !ok {
+		t.Fatal("node1 not in status list")
+	} else if !host.State.IsUp() {
+		t.Fatal("node1 is not up")
+	}
+
+	NodeDown("node1")
+	status, err = Status()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if host, ok := status["node1"]; !ok {
+		t.Fatal("node1 not in status list")
+	} else if host.State.IsUp() {
+		t.Fatal("node1 is not down")
+	}
+
+	NodeUp("node1")
+	status, err = Status()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if host, ok := status["node1"]; !ok {
+		t.Fatal("node1 not in status list")
+	} else if !host.State.IsUp() {
+		t.Fatal("node1 is not up")
+	}
+}

+ 125 - 0
common_test.go

@@ -0,0 +1,125 @@
+package gocql
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+)
+
+var (
+	flagCluster      = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
+	flagProto        = flag.Int("proto", 2, "protcol version")
+	flagCQL          = flag.String("cql", "3.0.0", "CQL version")
+	flagRF           = flag.Int("rf", 1, "replication factor for test keyspace")
+	clusterSize      = flag.Int("clusterSize", 1, "the expected size of the cluster")
+	flagRetry        = flag.Int("retries", 5, "number of times to retry queries")
+	flagAutoWait     = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
+	flagRunSslTest   = flag.Bool("runssl", false, "Set to true to run ssl test")
+	flagRunAuthTest  = flag.Bool("runauth", false, "Set to true to run authentication test")
+	flagCompressTest = flag.String("compressor", "", "compressor to use")
+	flagTimeout      = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
+	clusterHosts     []string
+)
+
+func init() {
+	flag.Parse()
+	clusterHosts = strings.Split(*flagCluster, ",")
+	log.SetFlags(log.Lshortfile | log.LstdFlags)
+}
+
+func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
+	if *flagRunSslTest {
+		cluster.SslOpts = &SslOptions{
+			CertPath:               "testdata/pki/gocql.crt",
+			KeyPath:                "testdata/pki/gocql.key",
+			CaPath:                 "testdata/pki/ca.crt",
+			EnableHostVerification: false,
+		}
+	}
+	return cluster
+}
+
+var initOnce sync.Once
+
+func createTable(s *Session, table string) error {
+	if err := s.control.query(table).Close(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func createCluster() *ClusterConfig {
+	cluster := NewCluster(clusterHosts...)
+	cluster.ProtoVersion = *flagProto
+	cluster.CQLVersion = *flagCQL
+	cluster.Timeout = *flagTimeout
+	cluster.Consistency = Quorum
+	cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
+	if *flagRetry > 0 {
+		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
+	}
+
+	switch *flagCompressTest {
+	case "snappy":
+		cluster.Compressor = &SnappyCompressor{}
+	case "":
+	default:
+		panic("invalid compressor: " + *flagCompressTest)
+	}
+
+	cluster = addSslOptions(cluster)
+	return cluster
+}
+
+func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
+	c := *cluster
+	c.Keyspace = "system"
+	c.Timeout = 20 * time.Second
+	session, err := c.CreateSession()
+	if err != nil {
+		tb.Fatal("createSession:", err)
+	}
+	defer session.Close()
+	defer log.Println("closing keyspace session")
+
+	err = session.control.query(`DROP KEYSPACE IF EXISTS ` + keyspace).Close()
+	if err != nil {
+		tb.Fatal(err)
+	}
+
+	err = session.control.query(fmt.Sprintf(`CREATE KEYSPACE %s
+	WITH replication = {
+		'class' : 'SimpleStrategy',
+		'replication_factor' : %d
+	}`, keyspace, *flagRF)).Close()
+
+	if err != nil {
+		tb.Fatal(err)
+	}
+}
+
+func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
+	// Drop and re-create the keyspace once. Different tests should use their own
+	// individual tables, but can assume that the table does not exist before.
+	initOnce.Do(func() {
+		createKeyspace(tb, cluster, "gocql_test")
+	})
+
+	cluster.Keyspace = "gocql_test"
+	session, err := cluster.CreateSession()
+	if err != nil {
+		tb.Fatal("createSession:", err)
+	}
+
+	return session
+}
+
+func createSession(tb testing.TB) *Session {
+	cluster := createCluster()
+	return createSessionFromCluster(cluster, tb)
+}

+ 120 - 0
events_test.go

@@ -0,0 +1,120 @@
+// +build travis
+
+package gocql
+
+import (
+	"github.com/gocql/gocql/ccm_test"
+	"log"
+	"testing"
+	"time"
+)
+
+func TestEventDiscovery(t *testing.T) {
+	if err := ccm.AllUp(); err != nil {
+		t.Fatal(err)
+	}
+
+	session := createSession(t)
+	defer session.Close()
+
+	status, err := ccm.Status()
+	if err != nil {
+		t.Fatal(err)
+	}
+	log.Printf("status=%+v\n", status)
+
+	session.pool.mu.RLock()
+	poolHosts := session.pool.hostConnPools // TODO: replace with session.ring
+	log.Printf("poolhosts=%+v\n", poolHosts)
+	// check we discovered all the nodes in the ring
+	for _, host := range status {
+		if _, ok := poolHosts[host.Addr]; !ok {
+			t.Errorf("did not discover %q", host.Addr)
+		}
+	}
+	session.pool.mu.RUnlock()
+	if t.Failed() {
+		t.FailNow()
+	}
+}
+
+func TestEventNodeDown(t *testing.T) {
+	const targetNode = "node1"
+	if err := ccm.AllUp(); err != nil {
+		t.Fatal(err)
+	}
+
+	session := createSession(t)
+	defer session.Close()
+
+	if err := ccm.NodeDown(targetNode); err != nil {
+		t.Fatal(err)
+	}
+	log.Println("down")
+
+	status, err := ccm.Status()
+	if err != nil {
+		t.Fatal(err)
+	}
+	log.Printf("status=%+v\n", status)
+
+	time.Sleep(5 * time.Second)
+
+	session.pool.mu.RLock()
+	defer session.pool.mu.RUnlock()
+
+	poolHosts := session.pool.hostConnPools
+	node := status[targetNode]
+	log.Printf("poolhosts=%+v\n", poolHosts)
+
+	if _, ok := poolHosts[node.Addr]; ok {
+		t.Fatal("node not removed after remove event")
+	}
+}
+
+func TestEventNodeUp(t *testing.T) {
+	if err := ccm.AllUp(); err != nil {
+		t.Fatal(err)
+	}
+
+	status, err := ccm.Status()
+	if err != nil {
+		t.Fatal(err)
+	}
+	log.Printf("status=%+v\n", status)
+
+	session := createSession(t)
+	defer session.Close()
+
+	if err := ccm.NodeDown("node1"); err != nil {
+		t.Fatal(err)
+	}
+
+	time.Sleep(5 * time.Second)
+
+	session.pool.mu.RLock()
+
+	poolHosts := session.pool.hostConnPools
+	log.Printf("poolhosts=%+v\n", poolHosts)
+	node1 := status["node1"]
+
+	if _, ok := poolHosts[node1.Addr]; ok {
+		session.pool.mu.RUnlock()
+		t.Fatal("node1 not removed after remove event")
+	}
+	session.pool.mu.RUnlock()
+
+	if err := ccm.NodeUp("node1"); err != nil {
+		t.Fatal(err)
+	}
+
+	time.Sleep(5 * time.Second)
+
+	session.pool.mu.RLock()
+	log.Printf("poolhosts=%+v\n", poolHosts)
+	if _, ok := poolHosts[node1.Addr]; !ok {
+		session.pool.mu.RUnlock()
+		t.Fatal("node1 not added after node added event")
+	}
+	session.pool.mu.RUnlock()
+}