| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- // Copyright (c) 2015 The gocql Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // +build conn_pool
- package gocql
- import (
- "flag"
- "fmt"
- "log"
- "os/exec"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
- )
- // connection pool behavior test when nodes are removed from the cluster
- // to run this test, see connectionpool_systems_test.sh
- var (
- flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
- flagProto = flag.Int("proto", 2, "protcol version")
- flagCQL = flag.String("cql", "3.0.0", "CQL version")
- flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
- clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
- nodesShut = flag.Int("nodesShut", 1, "the number of nodes to shutdown during the test")
- flagRetry = flag.Int("retries", 5, "number of times to retry queries")
- flagRunSsl = flag.Bool("runssl", false, "Set to true to run ssl test")
- clusterHosts []string
- )
- var initOnce sync.Once
- func init() {
- flag.Parse()
- clusterHosts = strings.Split(*flagCluster, ",")
- log.SetFlags(log.Lshortfile | log.LstdFlags)
- }
- func createTable(s *Session, table string) error {
- err := s.Query(table).Consistency(All).Exec()
- if *clusterSize > 1 {
- // wait for table definition to propogate
- time.Sleep(250 * time.Millisecond)
- }
- return err
- }
- func createCluster() *ClusterConfig {
- cluster := NewCluster(clusterHosts...)
- cluster.ProtoVersion = *flagProto
- cluster.CQLVersion = *flagCQL
- cluster.Timeout = 5 * time.Second
- cluster.Consistency = Quorum
- if *flagRetry > 0 {
- cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
- }
- if *flagRunSsl {
- cluster.SslOpts = &SslOptions{
- CertPath: "testdata/pki/gocql.crt",
- KeyPath: "testdata/pki/gocql.key",
- CaPath: "testdata/pki/ca.crt",
- EnableHostVerification: false,
- }
- }
- return cluster
- }
- func createKeyspace(t testing.T, cluster *ClusterConfig, keyspace string) {
- session, err := cluster.CreateSession()
- if err != nil {
- t.Fatal("createSession:", err)
- }
- if err = session.Query(`DROP KEYSPACE ` + keyspace).Exec(); err != nil {
- t.Log("drop keyspace:", err)
- }
- err = session.Query(
- fmt.Sprintf(
- `
- CREATE KEYSPACE %s
- WITH replication = {
- 'class' : 'SimpleStrategy',
- 'replication_factor' : %d
- }
- `,
- keyspace,
- *flagRF,
- ),
- ).Consistency(All).Exec()
- if err != nil {
- t.Fatalf("error creating keyspace %s: %v", keyspace, err)
- }
- t.Logf("Created keyspace %s", keyspace)
- session.Close()
- }
- func createSession(t testing.T) *Session {
- cluster := createCluster()
- // Drop and re-create the keyspace once. Different tests should use their own
- // individual tables, but can assume that the table does not exist before.
- initOnce.Do(func() {
- createKeyspace(t, cluster, "gocql_test")
- })
- cluster.Keyspace = "gocql_test"
- session, err := cluster.CreateSession()
- if err != nil {
- t.Fatal("createSession:", err)
- }
- return session
- }
- func TestSimplePool(t *testing.T) {
- testConnPool(t, NewSimplePool)
- }
- func TestRRPolicyConnPool(t *testing.T) {
- testConnPool(t, NewRoundRobinConnPool)
- }
- func TestTAPolicyConnPool(t *testing.T) {
- testConnPool(t, NewTokenAwareConnPool)
- }
- func testConnPool(t *testing.T, connPoolType NewPoolFunc) {
- var out []byte
- var err error
- log.SetFlags(log.Ltime)
- // make sure the cluster is running
- out, err = exec.Command("ccm", "start").CombinedOutput()
- if err != nil {
- t.Fatalf("Error running ccm command: %v", err)
- fmt.Printf("ccm output:\n%s", string(out))
- }
- time.Sleep(time.Duration(*clusterSize) * 1000 * time.Millisecond)
- // fire up a session (no discovery)
- cluster := createCluster()
- cluster.ConnPoolType = connPoolType
- cluster.DiscoverHosts = false
- session, err := cluster.CreateSession()
- if err != nil {
- t.Fatalf("Error connecting to cluster: %v", err)
- }
- defer session.Close()
- time.Sleep(time.Duration(*clusterSize) * 1000 * time.Millisecond)
- if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
- t.Logf(
- "WARN: Expected %d pool size, but was %d",
- (*clusterSize)*cluster.NumConns,
- session.Pool.Size(),
- )
- }
- // start some connection monitoring
- nilCheckStop := false
- nilCount := 0
- nilCheck := func() {
- // assert that all connections returned by the pool are non-nil
- for !nilCheckStop {
- actual := session.Pool.Pick(nil)
- if actual == nil {
- nilCount++
- }
- }
- }
- go nilCheck()
- // shutdown some hosts
- log.Println("shutdown some hosts")
- for i := 0; i < *nodesShut; i++ {
- out, err = exec.Command("ccm", "node"+strconv.Itoa(i+1), "stop").CombinedOutput()
- if err != nil {
- t.Fatalf("Error running ccm command: %v", err)
- fmt.Printf("ccm output:\n%s", string(out))
- }
- time.Sleep(1500 * time.Millisecond)
- }
- time.Sleep(500 * time.Millisecond)
- if session.Pool.Size() != ((*clusterSize)-(*nodesShut))*cluster.NumConns {
- t.Logf(
- "WARN: Expected %d pool size, but was %d",
- ((*clusterSize)-(*nodesShut))*cluster.NumConns,
- session.Pool.Size(),
- )
- }
- // bringup the shutdown hosts
- log.Println("bringup the shutdown hosts")
- for i := 0; i < *nodesShut; i++ {
- out, err = exec.Command("ccm", "node"+strconv.Itoa(i+1), "start").CombinedOutput()
- if err != nil {
- t.Fatalf("Error running ccm command: %v", err)
- fmt.Printf("ccm output:\n%s", string(out))
- }
- time.Sleep(1500 * time.Millisecond)
- }
- time.Sleep(500 * time.Millisecond)
- if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
- t.Logf(
- "WARN: Expected %d pool size, but was %d",
- (*clusterSize)*cluster.NumConns,
- session.Pool.Size(),
- )
- }
- // assert that all connections returned by the pool are non-nil
- if nilCount > 0 {
- t.Errorf("%d nil connections returned from %T", nilCount, session.Pool)
- }
- // shutdown cluster
- log.Println("shutdown cluster")
- out, err = exec.Command("ccm", "stop").CombinedOutput()
- if err != nil {
- t.Fatalf("Error running ccm command: %v", err)
- fmt.Printf("ccm output:\n%s", string(out))
- }
- time.Sleep(2500 * time.Millisecond)
- if session.Pool.Size() != 0 {
- t.Logf(
- "WARN: Expected %d pool size, but was %d",
- 0,
- session.Pool.Size(),
- )
- }
- // start cluster
- log.Println("start cluster")
- out, err = exec.Command("ccm", "start").CombinedOutput()
- if err != nil {
- t.Fatalf("Error running ccm command: %v", err)
- fmt.Printf("ccm output:\n%s", string(out))
- }
- time.Sleep(500 * time.Millisecond)
- // reset the count
- nilCount = 0
- time.Sleep(3000 * time.Millisecond)
- if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
- t.Logf(
- "WARN: Expected %d pool size, but was %d",
- (*clusterSize)*cluster.NumConns,
- session.Pool.Size(),
- )
- }
- // assert that all connections returned by the pool are non-nil
- if nilCount > 0 {
- t.Errorf("%d nil connections returned from %T", nilCount, session.Pool)
- }
- nilCheckStop = true
- }
|