cluster.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. // Cluster sets up and maintains the node configuration of a Cassandra
  12. // cluster.
  13. //
  14. // It has a varity of attributes that can be used to modify the behavior
  15. // to fit the most common use cases. Applications that requre a different
  16. // a setup should compose the nodes on their own.
  17. type Cluster struct {
  18. Hosts []string
  19. CQLVersion string
  20. Timeout time.Duration
  21. DefaultPort int
  22. Keyspace string
  23. ConnPerHost int
  24. DelayMin time.Duration
  25. DelayMax time.Duration
  26. pool *RoundRobin
  27. initOnce sync.Once
  28. boot sync.WaitGroup
  29. bootOnce sync.Once
  30. }
  31. func NewCluster(hosts ...string) *Cluster {
  32. c := &Cluster{
  33. Hosts: hosts,
  34. CQLVersion: "3.0.0",
  35. Timeout: 200 * time.Millisecond,
  36. DefaultPort: 9042,
  37. }
  38. return c
  39. }
  40. func (c *Cluster) init() {
  41. for i := 0; i < len(c.Hosts); i++ {
  42. addr := strings.TrimSpace(c.Hosts[i])
  43. if strings.IndexByte(addr, ':') < 0 {
  44. addr = fmt.Sprintf("%s:%d", addr, c.DefaultPort)
  45. }
  46. go c.connect(addr)
  47. }
  48. c.pool = NewRoundRobin()
  49. <-time.After(c.Timeout)
  50. }
  51. func (c *Cluster) connect(addr string) {
  52. delay := c.DelayMin
  53. for {
  54. conn, err := Connect(addr, c.CQLVersion, c.Timeout)
  55. if err != nil {
  56. <-time.After(delay)
  57. if delay *= 2; delay > c.DelayMax {
  58. delay = c.DelayMax
  59. }
  60. continue
  61. }
  62. c.pool.AddNode(conn)
  63. go func() {
  64. conn.Serve()
  65. c.pool.RemoveNode(conn)
  66. c.connect(addr)
  67. }()
  68. return
  69. }
  70. }
  71. func (c *Cluster) CreateSession() *Session {
  72. c.initOnce.Do(c.init)
  73. return NewSession(c.pool)
  74. }