| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- // +build ccm
- package gocql
- import (
- "github.com/gocql/gocql/ccm_test"
- "log"
- "testing"
- "time"
- )
- func TestEventDiscovery(t *testing.T) {
- if err := ccm.AllUp(); err != nil {
- t.Fatal(err)
- }
- session := createSession(t)
- defer session.Close()
- status, err := ccm.Status()
- if err != nil {
- t.Fatal(err)
- }
- t.Logf("status=%+v\n", status)
- session.pool.mu.RLock()
- poolHosts := session.pool.hostConnPools // TODO: replace with session.ring
- t.Logf("poolhosts=%+v\n", poolHosts)
- // check we discovered all the nodes in the ring
- for _, host := range status {
- if _, ok := poolHosts[host.Addr]; !ok {
- t.Errorf("did not discover %q", host.Addr)
- }
- }
- session.pool.mu.RUnlock()
- if t.Failed() {
- t.FailNow()
- }
- }
- func TestEventNodeDownControl(t *testing.T) {
- const targetNode = "node1"
- t.Log("marking " + targetNode + " as down")
- if err := ccm.AllUp(); err != nil {
- t.Fatal(err)
- }
- session := createSession(t)
- defer session.Close()
- if err := ccm.NodeDown(targetNode); err != nil {
- t.Fatal(err)
- }
- status, err := ccm.Status()
- if err != nil {
- t.Fatal(err)
- }
- t.Logf("status=%+v\n", status)
- t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
- time.Sleep(5 * time.Second)
- session.pool.mu.RLock()
- poolHosts := session.pool.hostConnPools
- node := status[targetNode]
- t.Logf("poolhosts=%+v\n", poolHosts)
- if _, ok := poolHosts[node.Addr]; ok {
- session.pool.mu.RUnlock()
- t.Fatal("node not removed after remove event")
- }
- session.pool.mu.RUnlock()
- }
- func TestEventNodeDown(t *testing.T) {
- const targetNode = "node3"
- if err := ccm.AllUp(); err != nil {
- t.Fatal(err)
- }
- session := createSession(t)
- defer session.Close()
- if err := ccm.NodeDown(targetNode); err != nil {
- t.Fatal(err)
- }
- status, err := ccm.Status()
- if err != nil {
- t.Fatal(err)
- }
- t.Logf("status=%+v\n", status)
- t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
- time.Sleep(5 * time.Second)
- session.pool.mu.RLock()
- defer session.pool.mu.RUnlock()
- poolHosts := session.pool.hostConnPools
- node := status[targetNode]
- t.Logf("poolhosts=%+v\n", poolHosts)
- if _, ok := poolHosts[node.Addr]; ok {
- t.Fatal("node not removed after remove event")
- }
- }
- func TestEventNodeUp(t *testing.T) {
- if err := ccm.AllUp(); err != nil {
- t.Fatal(err)
- }
- status, err := ccm.Status()
- if err != nil {
- t.Fatal(err)
- }
- log.Printf("status=%+v\n", status)
- session := createSession(t)
- defer session.Close()
- poolHosts := session.pool.hostConnPools
- const targetNode = "node2"
- session.pool.mu.RLock()
- _, ok := poolHosts[status[targetNode].Addr]
- session.pool.mu.RUnlock()
- if !ok {
- session.pool.mu.RLock()
- t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, poolHosts)
- session.pool.mu.RUnlock()
- t.FailNow()
- }
- if err := ccm.NodeDown(targetNode); err != nil {
- t.Fatal(err)
- }
- time.Sleep(5 * time.Second)
- session.pool.mu.RLock()
- log.Printf("poolhosts=%+v\n", poolHosts)
- node := status[targetNode]
- if _, ok := poolHosts[node.Addr]; ok {
- session.pool.mu.RUnlock()
- t.Fatal("node not removed after remove event")
- }
- session.pool.mu.RUnlock()
- if err := ccm.NodeUp(targetNode); err != nil {
- t.Fatal(err)
- }
- time.Sleep(5 * time.Second)
- session.pool.mu.RLock()
- log.Printf("poolhosts=%+v\n", poolHosts)
- if _, ok := poolHosts[node.Addr]; !ok {
- session.pool.mu.RUnlock()
- t.Fatal("node not added after node added event")
- }
- session.pool.mu.RUnlock()
- }
|