events_ccm_test.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // +build ccm, ignore
  2. package gocql
  3. import (
  4. "log"
  5. "testing"
  6. "time"
  7. "github.com/gocql/gocql/internal/ccm"
  8. )
  9. func TestEventDiscovery(t *testing.T) {
  10. t.Skip("FLAKE skipping")
  11. if err := ccm.AllUp(); err != nil {
  12. t.Fatal(err)
  13. }
  14. session := createSession(t)
  15. defer session.Close()
  16. status, err := ccm.Status()
  17. if err != nil {
  18. t.Fatal(err)
  19. }
  20. t.Logf("status=%+v\n", status)
  21. session.pool.mu.RLock()
  22. poolHosts := session.pool.hostConnPools // TODO: replace with session.ring
  23. t.Logf("poolhosts=%+v\n", poolHosts)
  24. // check we discovered all the nodes in the ring
  25. for _, host := range status {
  26. if _, ok := poolHosts[host.Addr]; !ok {
  27. t.Errorf("did not discover %q", host.Addr)
  28. }
  29. }
  30. session.pool.mu.RUnlock()
  31. if t.Failed() {
  32. t.FailNow()
  33. }
  34. }
  35. func TestEventNodeDownControl(t *testing.T) {
  36. t.Skip("FLAKE skipping")
  37. const targetNode = "node1"
  38. if err := ccm.AllUp(); err != nil {
  39. t.Fatal(err)
  40. }
  41. status, err := ccm.Status()
  42. if err != nil {
  43. t.Fatal(err)
  44. }
  45. cluster := createCluster()
  46. cluster.Hosts = []string{status[targetNode].Addr}
  47. session := createSessionFromCluster(cluster, t)
  48. defer session.Close()
  49. t.Log("marking " + targetNode + " as down")
  50. if err := ccm.NodeDown(targetNode); err != nil {
  51. t.Fatal(err)
  52. }
  53. t.Logf("status=%+v\n", status)
  54. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  55. time.Sleep(5 * time.Second)
  56. session.pool.mu.RLock()
  57. poolHosts := session.pool.hostConnPools
  58. node := status[targetNode]
  59. t.Logf("poolhosts=%+v\n", poolHosts)
  60. if _, ok := poolHosts[node.Addr]; ok {
  61. session.pool.mu.RUnlock()
  62. t.Fatal("node not removed after remove event")
  63. }
  64. session.pool.mu.RUnlock()
  65. host := session.ring.getHost(node.Addr)
  66. if host == nil {
  67. t.Fatal("node not in metadata ring")
  68. } else if host.IsUp() {
  69. t.Fatalf("not not marked as down after event in metadata: %v", host)
  70. }
  71. }
  72. func TestEventNodeDown(t *testing.T) {
  73. t.Skip("FLAKE skipping")
  74. const targetNode = "node3"
  75. if err := ccm.AllUp(); err != nil {
  76. t.Fatal(err)
  77. }
  78. session := createSession(t)
  79. defer session.Close()
  80. if err := ccm.NodeDown(targetNode); err != nil {
  81. t.Fatal(err)
  82. }
  83. status, err := ccm.Status()
  84. if err != nil {
  85. t.Fatal(err)
  86. }
  87. t.Logf("status=%+v\n", status)
  88. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  89. time.Sleep(5 * time.Second)
  90. session.pool.mu.RLock()
  91. defer session.pool.mu.RUnlock()
  92. poolHosts := session.pool.hostConnPools
  93. node := status[targetNode]
  94. t.Logf("poolhosts=%+v\n", poolHosts)
  95. if _, ok := poolHosts[node.Addr]; ok {
  96. t.Fatal("node not removed after remove event")
  97. }
  98. host := session.ring.getHost(node.Addr)
  99. if host == nil {
  100. t.Fatal("node not in metadata ring")
  101. } else if host.IsUp() {
  102. t.Fatalf("not not marked as down after event in metadata: %v", host)
  103. }
  104. }
  105. func TestEventNodeUp(t *testing.T) {
  106. t.Skip("FLAKE skipping")
  107. if err := ccm.AllUp(); err != nil {
  108. t.Fatal(err)
  109. }
  110. status, err := ccm.Status()
  111. if err != nil {
  112. t.Fatal(err)
  113. }
  114. log.Printf("status=%+v\n", status)
  115. session := createSession(t)
  116. defer session.Close()
  117. const targetNode = "node2"
  118. node := status[targetNode]
  119. _, ok := session.pool.getPool(node.Addr)
  120. if !ok {
  121. session.pool.mu.RLock()
  122. t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, session.pool.hostConnPools)
  123. session.pool.mu.RUnlock()
  124. t.FailNow()
  125. }
  126. if err := ccm.NodeDown(targetNode); err != nil {
  127. t.Fatal(err)
  128. }
  129. time.Sleep(5 * time.Second)
  130. _, ok = session.pool.getPool(node.Addr)
  131. if ok {
  132. t.Fatal("node not removed after remove event")
  133. }
  134. if err := ccm.NodeUp(targetNode); err != nil {
  135. t.Fatal(err)
  136. }
  137. // cassandra < 2.2 needs 10 seconds to start up the binary service
  138. time.Sleep(15 * time.Second)
  139. _, ok = session.pool.getPool(node.Addr)
  140. if !ok {
  141. t.Fatal("node not added after node added event")
  142. }
  143. host := session.ring.getHost(node.Addr)
  144. if host == nil {
  145. t.Fatal("node not in metadata ring")
  146. } else if !host.IsUp() {
  147. t.Fatalf("not not marked as UP after event in metadata: addr=%q host=%p: %v", node.Addr, host, host)
  148. }
  149. }
  150. func TestEventFilter(t *testing.T) {
  151. t.Skip("FLAKE skipping")
  152. if err := ccm.AllUp(); err != nil {
  153. t.Fatal(err)
  154. }
  155. status, err := ccm.Status()
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. log.Printf("status=%+v\n", status)
  160. cluster := createCluster()
  161. cluster.HostFilter = WhiteListHostFilter(status["node1"].Addr)
  162. session := createSessionFromCluster(cluster, t)
  163. defer session.Close()
  164. if _, ok := session.pool.getPool(status["node1"].Addr); !ok {
  165. t.Errorf("should have %v in pool but dont", "node1")
  166. }
  167. for _, host := range [...]string{"node2", "node3"} {
  168. _, ok := session.pool.getPool(status[host].Addr)
  169. if ok {
  170. t.Errorf("should not have %v in pool", host)
  171. }
  172. }
  173. if t.Failed() {
  174. t.FailNow()
  175. }
  176. if err := ccm.NodeDown("node2"); err != nil {
  177. t.Fatal(err)
  178. }
  179. time.Sleep(5 * time.Second)
  180. if err := ccm.NodeUp("node2"); err != nil {
  181. t.Fatal(err)
  182. }
  183. time.Sleep(15 * time.Second)
  184. for _, host := range [...]string{"node2", "node3"} {
  185. _, ok := session.pool.getPool(status[host].Addr)
  186. if ok {
  187. t.Errorf("should not have %v in pool", host)
  188. }
  189. }
  190. if t.Failed() {
  191. t.FailNow()
  192. }
  193. }
  194. func TestEventDownQueryable(t *testing.T) {
  195. t.Skip("FLAKE skipping")
  196. if err := ccm.AllUp(); err != nil {
  197. t.Fatal(err)
  198. }
  199. status, err := ccm.Status()
  200. if err != nil {
  201. t.Fatal(err)
  202. }
  203. log.Printf("status=%+v\n", status)
  204. const targetNode = "node1"
  205. addr := status[targetNode].Addr
  206. cluster := createCluster()
  207. cluster.Hosts = []string{addr}
  208. cluster.HostFilter = WhiteListHostFilter(addr)
  209. session := createSessionFromCluster(cluster, t)
  210. defer session.Close()
  211. if pool, ok := session.pool.getPool(addr); !ok {
  212. t.Fatalf("should have %v in pool but dont", addr)
  213. } else if !pool.host.IsUp() {
  214. t.Fatalf("host is not up %v", pool.host)
  215. }
  216. if err := ccm.NodeDown(targetNode); err != nil {
  217. t.Fatal(err)
  218. }
  219. time.Sleep(5 * time.Second)
  220. if err := ccm.NodeUp(targetNode); err != nil {
  221. t.Fatal(err)
  222. }
  223. time.Sleep(15 * time.Second)
  224. if pool, ok := session.pool.getPool(addr); !ok {
  225. t.Fatalf("should have %v in pool but dont", addr)
  226. } else if !pool.host.IsUp() {
  227. t.Fatalf("host is not up %v", pool.host)
  228. }
  229. var rows int
  230. if err := session.Query("SELECT COUNT(*) FROM system.local").Scan(&rows); err != nil {
  231. t.Fatal(err)
  232. } else if rows != 1 {
  233. t.Fatalf("expected to get 1 row got %d", rows)
  234. }
  235. }