events_ccm_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // +build ccm
  2. package gocql
  3. import (
  4. "log"
  5. "testing"
  6. "time"
  7. "github.com/gocql/gocql/internal/ccm"
  8. )
  9. func TestEventDiscovery(t *testing.T) {
  10. if err := ccm.AllUp(); err != nil {
  11. t.Fatal(err)
  12. }
  13. session := createSession(t)
  14. defer session.Close()
  15. status, err := ccm.Status()
  16. if err != nil {
  17. t.Fatal(err)
  18. }
  19. t.Logf("status=%+v\n", status)
  20. session.pool.mu.RLock()
  21. poolHosts := session.pool.hostConnPools // TODO: replace with session.ring
  22. t.Logf("poolhosts=%+v\n", poolHosts)
  23. // check we discovered all the nodes in the ring
  24. for _, host := range status {
  25. if _, ok := poolHosts[host.Addr]; !ok {
  26. t.Errorf("did not discover %q", host.Addr)
  27. }
  28. }
  29. session.pool.mu.RUnlock()
  30. if t.Failed() {
  31. t.FailNow()
  32. }
  33. }
  34. func TestEventNodeDownControl(t *testing.T) {
  35. const targetNode = "node1"
  36. if err := ccm.AllUp(); err != nil {
  37. t.Fatal(err)
  38. }
  39. status, err := ccm.Status()
  40. if err != nil {
  41. t.Fatal(err)
  42. }
  43. cluster := createCluster()
  44. cluster.ReconnectInterval = 0
  45. cluster.Hosts = []string{status[targetNode].Addr}
  46. session := createSessionFromCluster(cluster, t)
  47. defer session.Close()
  48. t.Log("marking " + targetNode + " as down")
  49. if err := ccm.NodeDown(targetNode); err != nil {
  50. t.Fatal(err)
  51. }
  52. t.Logf("status=%+v\n", status)
  53. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  54. time.Sleep(5 * time.Second)
  55. session.pool.mu.RLock()
  56. poolHosts := session.pool.hostConnPools
  57. node := status[targetNode]
  58. t.Logf("poolhosts=%+v\n", poolHosts)
  59. if _, ok := poolHosts[node.Addr]; ok {
  60. session.pool.mu.RUnlock()
  61. t.Fatal("node not removed after remove event")
  62. }
  63. session.pool.mu.RUnlock()
  64. host := session.ring.getHost(node.Addr)
  65. if host == nil {
  66. t.Fatal("node not in metadata ring")
  67. } else if host.IsUp() {
  68. t.Fatalf("not not marked as down after event in metadata: %v", host)
  69. }
  70. }
  71. func TestEventNodeDown(t *testing.T) {
  72. const targetNode = "node3"
  73. if err := ccm.AllUp(); err != nil {
  74. t.Fatal(err)
  75. }
  76. session := createSession(t)
  77. defer session.Close()
  78. if err := ccm.NodeDown(targetNode); err != nil {
  79. t.Fatal(err)
  80. }
  81. status, err := ccm.Status()
  82. if err != nil {
  83. t.Fatal(err)
  84. }
  85. t.Logf("status=%+v\n", status)
  86. t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
  87. time.Sleep(5 * time.Second)
  88. session.pool.mu.RLock()
  89. defer session.pool.mu.RUnlock()
  90. poolHosts := session.pool.hostConnPools
  91. node := status[targetNode]
  92. t.Logf("poolhosts=%+v\n", poolHosts)
  93. if _, ok := poolHosts[node.Addr]; ok {
  94. t.Fatal("node not removed after remove event")
  95. }
  96. host := session.ring.getHost(node.Addr)
  97. if host == nil {
  98. t.Fatal("node not in metadata ring")
  99. } else if host.IsUp() {
  100. t.Fatalf("not not marked as down after event in metadata: %v", host)
  101. }
  102. }
  103. func TestEventNodeUp(t *testing.T) {
  104. if err := ccm.AllUp(); err != nil {
  105. t.Fatal(err)
  106. }
  107. status, err := ccm.Status()
  108. if err != nil {
  109. t.Fatal(err)
  110. }
  111. log.Printf("status=%+v\n", status)
  112. session := createSession(t)
  113. defer session.Close()
  114. const targetNode = "node2"
  115. node := status[targetNode]
  116. _, ok := session.pool.getPool(node.Addr)
  117. if !ok {
  118. session.pool.mu.RLock()
  119. t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, session.pool.hostConnPools)
  120. session.pool.mu.RUnlock()
  121. t.FailNow()
  122. }
  123. if err := ccm.NodeDown(targetNode); err != nil {
  124. t.Fatal(err)
  125. }
  126. time.Sleep(5 * time.Second)
  127. _, ok = session.pool.getPool(node.Addr)
  128. if ok {
  129. t.Fatal("node not removed after remove event")
  130. }
  131. if err := ccm.NodeUp(targetNode); err != nil {
  132. t.Fatal(err)
  133. }
  134. // cassandra < 2.2 needs 10 seconds to start up the binary service
  135. time.Sleep(15 * time.Second)
  136. _, ok = session.pool.getPool(node.Addr)
  137. if !ok {
  138. t.Fatal("node not added after node added event")
  139. }
  140. host := session.ring.getHost(node.Addr)
  141. if host == nil {
  142. t.Fatal("node not in metadata ring")
  143. } else if !host.IsUp() {
  144. t.Fatalf("not not marked as UP after event in metadata: addr=%q host=%p: %v", node.Addr, host, host)
  145. }
  146. }
  147. func TestEventFilter(t *testing.T) {
  148. if err := ccm.AllUp(); err != nil {
  149. t.Fatal(err)
  150. }
  151. status, err := ccm.Status()
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. log.Printf("status=%+v\n", status)
  156. cluster := createCluster()
  157. cluster.HostFilter = WhiteListHostFilter(status["node1"].Addr)
  158. session := createSessionFromCluster(cluster, t)
  159. defer session.Close()
  160. if _, ok := session.pool.getPool(status["node1"].Addr); !ok {
  161. t.Errorf("should have %v in pool but dont", "node1")
  162. }
  163. for _, host := range [...]string{"node2", "node3"} {
  164. _, ok := session.pool.getPool(status[host].Addr)
  165. if ok {
  166. t.Errorf("should not have %v in pool", host)
  167. }
  168. }
  169. if t.Failed() {
  170. t.FailNow()
  171. }
  172. if err := ccm.NodeDown("node2"); err != nil {
  173. t.Fatal(err)
  174. }
  175. time.Sleep(5 * time.Second)
  176. if err := ccm.NodeUp("node2"); err != nil {
  177. t.Fatal(err)
  178. }
  179. time.Sleep(15 * time.Second)
  180. for _, host := range [...]string{"node2", "node3"} {
  181. _, ok := session.pool.getPool(status[host].Addr)
  182. if ok {
  183. t.Errorf("should not have %v in pool", host)
  184. }
  185. }
  186. if t.Failed() {
  187. t.FailNow()
  188. }
  189. }
  190. func TestEventDownQueryable(t *testing.T) {
  191. if err := ccm.AllUp(); err != nil {
  192. t.Fatal(err)
  193. }
  194. status, err := ccm.Status()
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. log.Printf("status=%+v\n", status)
  199. const targetNode = "node1"
  200. addr := status[targetNode].Addr
  201. cluster := createCluster()
  202. cluster.Hosts = []string{addr}
  203. cluster.HostFilter = WhiteListHostFilter(addr)
  204. session := createSessionFromCluster(cluster, t)
  205. defer session.Close()
  206. if pool, ok := session.pool.getPool(addr); !ok {
  207. t.Fatalf("should have %v in pool but dont", addr)
  208. } else if !pool.host.IsUp() {
  209. t.Fatalf("host is not up %v", pool.host)
  210. }
  211. if err := ccm.NodeDown(targetNode); err != nil {
  212. t.Fatal(err)
  213. }
  214. time.Sleep(5 * time.Second)
  215. if err := ccm.NodeUp(targetNode); err != nil {
  216. t.Fatal(err)
  217. }
  218. time.Sleep(15 * time.Second)
  219. if pool, ok := session.pool.getPool(addr); !ok {
  220. t.Fatalf("should have %v in pool but dont", addr)
  221. } else if !pool.host.IsUp() {
  222. t.Fatalf("host is not up %v", pool.host)
  223. }
  224. var rows int
  225. if err := session.Query("SELECT COUNT(*) FROM system.local").Scan(&rows); err != nil {
  226. t.Fatal(err)
  227. } else if rows != 1 {
  228. t.Fatalf("expected to get 1 row got %d", rows)
  229. }
  230. }