cluster.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. // Cluster sets up and maintains the node configuration of a Cassandra
  12. // cluster.
  13. //
  14. // It has a varity of attributes that can be used to modify the behavior
  15. // to fit the most common use cases. Applications that requre a different
  16. // a setup should compose the nodes on their own.
  17. type ClusterConfig struct {
  18. Hosts []string
  19. CQLVersion string
  20. ProtoVersion int
  21. Timeout time.Duration
  22. DefaultPort int
  23. Keyspace string
  24. NumConn int
  25. NumStreams int
  26. DelayMin time.Duration
  27. DelayMax time.Duration
  28. StartupMin int
  29. }
  30. func NewCluster(hosts ...string) *ClusterConfig {
  31. cfg := &ClusterConfig{
  32. Hosts: hosts,
  33. CQLVersion: "3.0.0",
  34. ProtoVersion: 2,
  35. Timeout: 200 * time.Millisecond,
  36. DefaultPort: 9042,
  37. NumConn: 2,
  38. DelayMin: 1 * time.Second,
  39. DelayMax: 10 * time.Minute,
  40. StartupMin: len(hosts)/2 + 1,
  41. }
  42. return cfg
  43. }
  44. func (cfg *ClusterConfig) CreateSession() *Session {
  45. impl := &clusterImpl{
  46. cfg: *cfg,
  47. hostPool: NewRoundRobin(),
  48. connPool: make(map[string]*RoundRobin),
  49. }
  50. impl.wgStart.Add(1)
  51. impl.startup()
  52. impl.wgStart.Wait()
  53. return NewSession(impl)
  54. }
  55. type clusterImpl struct {
  56. cfg ClusterConfig
  57. hostPool *RoundRobin
  58. connPool map[string]*RoundRobin
  59. mu sync.RWMutex
  60. conns []*Conn
  61. started bool
  62. wgStart sync.WaitGroup
  63. quit bool
  64. quitWait chan bool
  65. quitOnce sync.Once
  66. keyspace string
  67. }
  68. func (c *clusterImpl) startup() {
  69. for i := 0; i < len(c.cfg.Hosts); i++ {
  70. addr := strings.TrimSpace(c.cfg.Hosts[i])
  71. if strings.IndexByte(addr, ':') < 0 {
  72. addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
  73. }
  74. for j := 0; j < c.cfg.NumConn; j++ {
  75. go c.connect(addr)
  76. }
  77. }
  78. }
  79. func (c *clusterImpl) connect(addr string) {
  80. cfg := ConnConfig{
  81. ProtoVersion: 2,
  82. CQLVersion: c.cfg.CQLVersion,
  83. Timeout: c.cfg.Timeout,
  84. NumStreams: c.cfg.NumStreams,
  85. }
  86. delay := c.cfg.DelayMin
  87. for {
  88. conn, err := Connect(addr, cfg, c)
  89. if err != nil {
  90. select {
  91. case <-time.After(delay):
  92. if delay *= 2; delay > c.cfg.DelayMax {
  93. delay = c.cfg.DelayMax
  94. }
  95. continue
  96. case <-c.quitWait:
  97. return
  98. }
  99. }
  100. c.addConn(conn, "")
  101. return
  102. }
  103. }
  104. func (c *clusterImpl) changeKeyspace(conn *Conn, keyspace string, connected bool) {
  105. if err := conn.UseKeyspace(keyspace); err != nil {
  106. conn.Close()
  107. if connected {
  108. c.removeConn(conn)
  109. }
  110. go c.connect(conn.Address())
  111. }
  112. if !connected {
  113. c.addConn(conn, keyspace)
  114. }
  115. }
  116. func (c *clusterImpl) addConn(conn *Conn, keyspace string) {
  117. c.mu.Lock()
  118. defer c.mu.Unlock()
  119. if c.quit {
  120. conn.Close()
  121. return
  122. }
  123. if keyspace != c.keyspace && c.keyspace != "" {
  124. go c.changeKeyspace(conn, c.keyspace, false)
  125. return
  126. }
  127. connPool := c.connPool[conn.Address()]
  128. if connPool == nil {
  129. connPool = NewRoundRobin()
  130. c.connPool[conn.Address()] = connPool
  131. c.hostPool.AddNode(connPool)
  132. if !c.started && c.hostPool.Size() >= c.cfg.StartupMin {
  133. c.started = true
  134. c.wgStart.Done()
  135. }
  136. }
  137. connPool.AddNode(conn)
  138. c.conns = append(c.conns, conn)
  139. }
  140. func (c *clusterImpl) removeConn(conn *Conn) {
  141. c.mu.Lock()
  142. defer c.mu.Unlock()
  143. conn.Close()
  144. connPool := c.connPool[conn.addr]
  145. if connPool == nil {
  146. return
  147. }
  148. connPool.RemoveNode(conn)
  149. if connPool.Size() == 0 {
  150. c.hostPool.RemoveNode(connPool)
  151. }
  152. for i := 0; i < len(c.conns); i++ {
  153. if c.conns[i] == conn {
  154. last := len(c.conns) - 1
  155. c.conns[i], c.conns[last] = c.conns[last], c.conns[i]
  156. c.conns = c.conns[:last]
  157. }
  158. }
  159. }
  160. func (c *clusterImpl) HandleError(conn *Conn, err error, closed bool) {
  161. if !closed {
  162. return
  163. }
  164. c.removeConn(conn)
  165. go c.connect(conn.Address())
  166. }
  167. func (c *clusterImpl) HandleKeyspace(conn *Conn, keyspace string) {
  168. c.mu.Lock()
  169. if c.keyspace == keyspace {
  170. c.mu.Unlock()
  171. return
  172. }
  173. c.keyspace = keyspace
  174. conns := make([]*Conn, len(c.conns))
  175. copy(conns, c.conns)
  176. c.mu.Unlock()
  177. for i := 0; i < len(conns); i++ {
  178. if conns[i] == conn {
  179. continue
  180. }
  181. c.changeKeyspace(conns[i], keyspace, true)
  182. }
  183. }
  184. func (c *clusterImpl) ExecuteQuery(qry *Query) (*Iter, error) {
  185. return c.hostPool.ExecuteQuery(qry)
  186. }
  187. func (c *clusterImpl) ExecuteBatch(batch *Batch) error {
  188. return c.hostPool.ExecuteBatch(batch)
  189. }
  190. func (c *clusterImpl) Close() {
  191. c.quitOnce.Do(func() {
  192. c.mu.Lock()
  193. defer c.mu.Unlock()
  194. c.quit = true
  195. close(c.quitWait)
  196. for i := 0; i < len(c.conns); i++ {
  197. c.conns[i].Close()
  198. }
  199. })
  200. }