events_ccm_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // +build ccm
  2. package gocql
  3. import (
  4. "log"
  5. "testing"
  6. "time"
  7. "github.com/gocql/gocql/internal/ccm"
  8. )
  9. func TestEventDiscovery(t *testing.T) {
  10. if err := ccm.AllUp(); err != nil {
  11. t.Fatal(err)
  12. }
  13. session := createSession(t)
  14. defer session.Close()
  15. status, err := ccm.Status()
  16. if err != nil {
  17. t.Fatal(err)
  18. }
  19. t.Logf("status=%+v\n", status)
  20. session.pool.mu.RLock()
  21. poolHosts := session.pool.hostConnPools // TODO: replace with session.ring
  22. t.Logf("poolhosts=%+v\n", poolHosts)
  23. // check we discovered all the nodes in the ring
  24. for _, host := range status {
  25. if _, ok := poolHosts[host.Addr]; !ok {
  26. t.Errorf("did not discover %q", host.Addr)
  27. }
  28. }
  29. session.pool.mu.RUnlock()
  30. if t.Failed() {
  31. t.FailNow()
  32. }
  33. }
  34. func TestEventNodeDownControl(t *testing.T) {
  35. const targetNode = "node1"
  36. if err := ccm.AllUp(); err != nil {
  37. t.Fatal(err)
  38. }
  39. status, err := ccm.Status()
  40. if err != nil {
  41. t.Fatal(err)
  42. }
  43. cluster := createCluster()
  44. cluster.Hosts = []string{status[targetNode].Addr}
  45. session := createSessionFromCluster(cluster, t)
  46. defer session.Close()
  47. t.Log("marking " + targetNode + " as down")
  48. if err := ccm.NodeDown(targetNode); err != nil {
  49. t.Fatal(err)
  50. }
  51. t.Logf("status=%+v\n", status)
  52. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  53. time.Sleep(5 * time.Second)
  54. session.pool.mu.RLock()
  55. poolHosts := session.pool.hostConnPools
  56. node := status[targetNode]
  57. t.Logf("poolhosts=%+v\n", poolHosts)
  58. if _, ok := poolHosts[node.Addr]; ok {
  59. session.pool.mu.RUnlock()
  60. t.Fatal("node not removed after remove event")
  61. }
  62. session.pool.mu.RUnlock()
  63. host := session.ring.getHost(node.Addr)
  64. if host == nil {
  65. t.Fatal("node not in metadata ring")
  66. } else if host.IsUp() {
  67. t.Fatalf("not not marked as down after event in metadata: %v", host)
  68. }
  69. }
  70. func TestEventNodeDown(t *testing.T) {
  71. const targetNode = "node3"
  72. if err := ccm.AllUp(); err != nil {
  73. t.Fatal(err)
  74. }
  75. session := createSession(t)
  76. defer session.Close()
  77. if err := ccm.NodeDown(targetNode); err != nil {
  78. t.Fatal(err)
  79. }
  80. status, err := ccm.Status()
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. t.Logf("status=%+v\n", status)
  85. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  86. time.Sleep(5 * time.Second)
  87. session.pool.mu.RLock()
  88. defer session.pool.mu.RUnlock()
  89. poolHosts := session.pool.hostConnPools
  90. node := status[targetNode]
  91. t.Logf("poolhosts=%+v\n", poolHosts)
  92. if _, ok := poolHosts[node.Addr]; ok {
  93. t.Fatal("node not removed after remove event")
  94. }
  95. host := session.ring.getHost(node.Addr)
  96. if host == nil {
  97. t.Fatal("node not in metadata ring")
  98. } else if host.IsUp() {
  99. t.Fatalf("not not marked as down after event in metadata: %v", host)
  100. }
  101. }
  102. func TestEventNodeUp(t *testing.T) {
  103. if err := ccm.AllUp(); err != nil {
  104. t.Fatal(err)
  105. }
  106. status, err := ccm.Status()
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. log.Printf("status=%+v\n", status)
  111. session := createSession(t)
  112. defer session.Close()
  113. const targetNode = "node2"
  114. node := status[targetNode]
  115. _, ok := session.pool.getPool(node.Addr)
  116. if !ok {
  117. session.pool.mu.RLock()
  118. t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, session.pool.hostConnPools)
  119. session.pool.mu.RUnlock()
  120. t.FailNow()
  121. }
  122. if err := ccm.NodeDown(targetNode); err != nil {
  123. t.Fatal(err)
  124. }
  125. time.Sleep(5 * time.Second)
  126. _, ok = session.pool.getPool(node.Addr)
  127. if ok {
  128. t.Fatal("node not removed after remove event")
  129. }
  130. if err := ccm.NodeUp(targetNode); err != nil {
  131. t.Fatal(err)
  132. }
  133. // cassandra < 2.2 needs 10 seconds to start up the binary service
  134. time.Sleep(15 * time.Second)
  135. _, ok = session.pool.getPool(node.Addr)
  136. if !ok {
  137. t.Fatal("node not added after node added event")
  138. }
  139. host := session.ring.getHost(node.Addr)
  140. if host == nil {
  141. t.Fatal("node not in metadata ring")
  142. } else if !host.IsUp() {
  143. t.Fatalf("not not marked as UP after event in metadata: addr=%q host=%p: %v", node.Addr, host, host)
  144. }
  145. }
  146. func TestEventFilter(t *testing.T) {
  147. if err := ccm.AllUp(); err != nil {
  148. t.Fatal(err)
  149. }
  150. status, err := ccm.Status()
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. log.Printf("status=%+v\n", status)
  155. cluster := createCluster()
  156. cluster.HostFilter = WhiteListHostFilter(status["node1"].Addr)
  157. session := createSessionFromCluster(cluster, t)
  158. defer session.Close()
  159. if _, ok := session.pool.getPool(status["node1"].Addr); !ok {
  160. t.Errorf("should have %v in pool but dont", "node1")
  161. }
  162. for _, host := range [...]string{"node2", "node3"} {
  163. _, ok := session.pool.getPool(status[host].Addr)
  164. if ok {
  165. t.Errorf("should not have %v in pool", host)
  166. }
  167. }
  168. if t.Failed() {
  169. t.FailNow()
  170. }
  171. if err := ccm.NodeDown("node2"); err != nil {
  172. t.Fatal(err)
  173. }
  174. time.Sleep(5 * time.Second)
  175. if err := ccm.NodeUp("node2"); err != nil {
  176. t.Fatal(err)
  177. }
  178. time.Sleep(15 * time.Second)
  179. for _, host := range [...]string{"node2", "node3"} {
  180. _, ok := session.pool.getPool(status[host].Addr)
  181. if ok {
  182. t.Errorf("should not have %v in pool", host)
  183. }
  184. }
  185. if t.Failed() {
  186. t.FailNow()
  187. }
  188. }
  189. func TestEventDownQueryable(t *testing.T) {
  190. if err := ccm.AllUp(); err != nil {
  191. t.Fatal(err)
  192. }
  193. status, err := ccm.Status()
  194. if err != nil {
  195. t.Fatal(err)
  196. }
  197. log.Printf("status=%+v\n", status)
  198. const targetNode = "node1"
  199. addr := status[targetNode].Addr
  200. cluster := createCluster()
  201. cluster.Hosts = []string{addr}
  202. cluster.HostFilter = WhiteListHostFilter(addr)
  203. session := createSessionFromCluster(cluster, t)
  204. defer session.Close()
  205. if pool, ok := session.pool.getPool(addr); !ok {
  206. t.Fatalf("should have %v in pool but dont", addr)
  207. } else if !pool.host.IsUp() {
  208. t.Fatalf("host is not up %v", pool.host)
  209. }
  210. if err := ccm.NodeDown(targetNode); err != nil {
  211. t.Fatal(err)
  212. }
  213. time.Sleep(5 * time.Second)
  214. if err := ccm.NodeUp(targetNode); err != nil {
  215. t.Fatal(err)
  216. }
  217. time.Sleep(15 * time.Second)
  218. if pool, ok := session.pool.getPool(addr); !ok {
  219. t.Fatalf("should have %v in pool but dont", addr)
  220. } else if !pool.host.IsUp() {
  221. t.Fatalf("host is not up %v", pool.host)
  222. }
  223. var rows int
  224. if err := session.Query("SELECT COUNT(*) FROM system.local").Scan(&rows); err != nil {
  225. t.Fatal(err)
  226. } else if rows != 1 {
  227. t.Fatalf("expected to get 1 row got %d", rows)
  228. }
  229. }