events_ccm_test.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // +build ccm
  2. package gocql
  3. import (
  4. "log"
  5. "testing"
  6. "time"
  7. "github.com/gocql/gocql/internal/ccm"
  8. )
  9. func TestEventDiscovery(t *testing.T) {
  10. if err := ccm.AllUp(); err != nil {
  11. t.Fatal(err)
  12. }
  13. session := createSession(t)
  14. defer session.Close()
  15. status, err := ccm.Status()
  16. if err != nil {
  17. t.Fatal(err)
  18. }
  19. t.Logf("status=%+v\n", status)
  20. session.pool.mu.RLock()
  21. poolHosts := session.pool.hostConnPools // TODO: replace with session.ring
  22. t.Logf("poolhosts=%+v\n", poolHosts)
  23. // check we discovered all the nodes in the ring
  24. for _, host := range status {
  25. if _, ok := poolHosts[host.Addr]; !ok {
  26. t.Errorf("did not discover %q", host.Addr)
  27. }
  28. }
  29. session.pool.mu.RUnlock()
  30. if t.Failed() {
  31. t.FailNow()
  32. }
  33. }
  34. func TestEventNodeDownControl(t *testing.T) {
  35. const targetNode = "node1"
  36. t.Log("marking " + targetNode + " as down")
  37. if err := ccm.AllUp(); err != nil {
  38. t.Fatal(err)
  39. }
  40. session := createSession(t)
  41. defer session.Close()
  42. if err := ccm.NodeDown(targetNode); err != nil {
  43. t.Fatal(err)
  44. }
  45. status, err := ccm.Status()
  46. if err != nil {
  47. t.Fatal(err)
  48. }
  49. t.Logf("status=%+v\n", status)
  50. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  51. time.Sleep(5 * time.Second)
  52. session.pool.mu.RLock()
  53. poolHosts := session.pool.hostConnPools
  54. node := status[targetNode]
  55. t.Logf("poolhosts=%+v\n", poolHosts)
  56. if _, ok := poolHosts[node.Addr]; ok {
  57. session.pool.mu.RUnlock()
  58. t.Fatal("node not removed after remove event")
  59. }
  60. session.pool.mu.RUnlock()
  61. host := session.ring.getHost(node.Addr)
  62. if host == nil {
  63. t.Fatal("node not in metadata ring")
  64. } else if host.IsUp() {
  65. t.Fatalf("not not marked as down after event in metadata: %v", host)
  66. }
  67. }
  68. func TestEventNodeDown(t *testing.T) {
  69. const targetNode = "node3"
  70. if err := ccm.AllUp(); err != nil {
  71. t.Fatal(err)
  72. }
  73. session := createSession(t)
  74. defer session.Close()
  75. if err := ccm.NodeDown(targetNode); err != nil {
  76. t.Fatal(err)
  77. }
  78. status, err := ccm.Status()
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. t.Logf("status=%+v\n", status)
  83. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  84. time.Sleep(5 * time.Second)
  85. session.pool.mu.RLock()
  86. defer session.pool.mu.RUnlock()
  87. poolHosts := session.pool.hostConnPools
  88. node := status[targetNode]
  89. t.Logf("poolhosts=%+v\n", poolHosts)
  90. if _, ok := poolHosts[node.Addr]; ok {
  91. t.Fatal("node not removed after remove event")
  92. }
  93. host := session.ring.getHost(node.Addr)
  94. if host == nil {
  95. t.Fatal("node not in metadata ring")
  96. } else if host.IsUp() {
  97. t.Fatalf("not not marked as down after event in metadata: %v", host)
  98. }
  99. }
  100. func TestEventNodeUp(t *testing.T) {
  101. if err := ccm.AllUp(); err != nil {
  102. t.Fatal(err)
  103. }
  104. status, err := ccm.Status()
  105. if err != nil {
  106. t.Fatal(err)
  107. }
  108. log.Printf("status=%+v\n", status)
  109. session := createSession(t)
  110. defer session.Close()
  111. const targetNode = "node2"
  112. node := status[targetNode]
  113. _, ok := session.pool.getPool(node.Addr)
  114. if !ok {
  115. session.pool.mu.RLock()
  116. t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, session.pool.hostConnPools)
  117. session.pool.mu.RUnlock()
  118. t.FailNow()
  119. }
  120. if err := ccm.NodeDown(targetNode); err != nil {
  121. t.Fatal(err)
  122. }
  123. time.Sleep(5 * time.Second)
  124. _, ok = session.pool.getPool(node.Addr)
  125. if ok {
  126. t.Fatal("node not removed after remove event")
  127. }
  128. if err := ccm.NodeUp(targetNode); err != nil {
  129. t.Fatal(err)
  130. }
  131. // cassandra < 2.2 needs 10 seconds to start up the binary service
  132. time.Sleep(15 * time.Second)
  133. _, ok = session.pool.getPool(node.Addr)
  134. if !ok {
  135. t.Fatal("node not added after node added event")
  136. }
  137. host := session.ring.getHost(node.Addr)
  138. if host == nil {
  139. t.Fatal("node not in metadata ring")
  140. } else if !host.IsUp() {
  141. t.Fatalf("not not marked as UP after event in metadata: addr=%q host=%p: %v", node.Addr, host, host)
  142. }
  143. }
  144. func TestEventFilter(t *testing.T) {
  145. if err := ccm.AllUp(); err != nil {
  146. t.Fatal(err)
  147. }
  148. status, err := ccm.Status()
  149. if err != nil {
  150. t.Fatal(err)
  151. }
  152. log.Printf("status=%+v\n", status)
  153. cluster := createCluster()
  154. cluster.HostFilter = WhiteListHostFilter(status["node1"].Addr)
  155. session := createSessionFromCluster(cluster, t)
  156. defer session.Close()
  157. if _, ok := session.pool.getPool(status["node1"].Addr); !ok {
  158. t.Errorf("should have %v in pool but dont", "node1")
  159. }
  160. for _, host := range [...]string{"node2", "node3"} {
  161. _, ok := session.pool.getPool(status[host].Addr)
  162. if ok {
  163. t.Errorf("should not have %v in pool", host)
  164. }
  165. }
  166. if t.Failed() {
  167. t.FailNow()
  168. }
  169. if err := ccm.NodeDown("node2"); err != nil {
  170. t.Fatal(err)
  171. }
  172. time.Sleep(5 * time.Second)
  173. if err := ccm.NodeUp("node2"); err != nil {
  174. t.Fatal(err)
  175. }
  176. time.Sleep(15 * time.Second)
  177. for _, host := range [...]string{"node2", "node3"} {
  178. _, ok := session.pool.getPool(status[host].Addr)
  179. if ok {
  180. t.Errorf("should not have %v in pool", host)
  181. }
  182. }
  183. if t.Failed() {
  184. t.FailNow()
  185. }
  186. }