cluster.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. // Cluster sets up and maintains the node configuration of a Cassandra
  12. // cluster.
  13. //
  14. // It has a varity of attributes that can be used to modify the behavior
  15. // to fit the most common use cases. Applications that requre a different
  16. // a setup should compose the nodes on their own.
  17. type Cluster struct {
  18. Hosts []string
  19. CQLVersion string
  20. Timeout time.Duration
  21. DefaultPort int
  22. Keyspace string
  23. ConnPerHost int
  24. DelayMin time.Duration
  25. DelayMax time.Duration
  26. }
  27. func NewCluster(hosts ...string) *Cluster {
  28. c := &Cluster{
  29. Hosts: hosts,
  30. CQLVersion: "3.0.0",
  31. Timeout: 200 * time.Millisecond,
  32. DefaultPort: 9042,
  33. ConnPerHost: 2,
  34. }
  35. return c
  36. }
  37. func (c *Cluster) CreateSession() *Session {
  38. return NewSession(newClusterNode(c))
  39. }
  40. type clusterNode struct {
  41. cfg Cluster
  42. hostPool *RoundRobin
  43. connPool map[string]*RoundRobin
  44. closed bool
  45. mu sync.Mutex
  46. }
  47. func newClusterNode(cfg *Cluster) *clusterNode {
  48. c := &clusterNode{
  49. cfg: *cfg,
  50. hostPool: NewRoundRobin(),
  51. connPool: make(map[string]*RoundRobin),
  52. }
  53. for i := 0; i < len(c.cfg.Hosts); i++ {
  54. addr := strings.TrimSpace(c.cfg.Hosts[i])
  55. if strings.IndexByte(addr, ':') < 0 {
  56. addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
  57. }
  58. for j := 0; j < c.cfg.ConnPerHost; j++ {
  59. go c.connect(addr)
  60. }
  61. }
  62. <-time.After(c.cfg.Timeout)
  63. return c
  64. }
  65. func (c *clusterNode) connect(addr string) {
  66. delay := c.cfg.DelayMin
  67. for {
  68. conn, err := Connect(addr, c.cfg.CQLVersion, c.cfg.Timeout)
  69. if err != nil {
  70. fmt.Println(err)
  71. <-time.After(delay)
  72. if delay *= 2; delay > c.cfg.DelayMax {
  73. delay = c.cfg.DelayMax
  74. }
  75. continue
  76. }
  77. c.addConn(addr, conn)
  78. return
  79. }
  80. }
  81. func (c *clusterNode) addConn(addr string, conn *Conn) {
  82. c.mu.Lock()
  83. defer c.mu.Unlock()
  84. connPool := c.connPool[addr]
  85. if connPool == nil {
  86. connPool = NewRoundRobin()
  87. c.connPool[addr] = connPool
  88. c.hostPool.AddNode(connPool)
  89. }
  90. connPool.AddNode(conn)
  91. go func() {
  92. conn.Serve()
  93. c.removeConn(addr, conn)
  94. }()
  95. }
  96. func (c *clusterNode) removeConn(addr string, conn *Conn) {
  97. c.mu.Lock()
  98. defer c.mu.Unlock()
  99. pool := c.connPool[addr]
  100. if pool == nil {
  101. return
  102. }
  103. pool.RemoveNode(conn)
  104. }
  105. func (c *clusterNode) ExecuteQuery(qry *Query) (*Iter, error) {
  106. return c.hostPool.ExecuteQuery(qry)
  107. }
  108. func (c *clusterNode) ExecuteBatch(batch *Batch) error {
  109. return c.hostPool.ExecuteBatch(batch)
  110. }
  111. func (c *clusterNode) Close() {
  112. c.hostPool.Close()
  113. }