|
@@ -11,26 +11,26 @@ import (
|
|
|
"time"
|
|
"time"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-// Cluster sets up and maintains the node configuration of a Cassandra
|
|
|
|
|
-// cluster.
|
|
|
|
|
-//
|
|
|
|
|
-// It has a varity of attributes that can be used to modify the behavior
|
|
|
|
|
-// to fit the most common use cases. Applications that requre a different
|
|
|
|
|
-// a setup should compose the nodes on their own.
|
|
|
|
|
|
|
+// ClusterConfig is a struct to configure the default cluster implementation
|
|
|
|
|
+// of gocoql. It has a varity of attributes that can be used to modify the
|
|
|
|
|
+// behavior to fit the most common use cases. Applications that requre a
|
|
|
|
|
+// different setup must implement their own cluster.
|
|
|
type ClusterConfig struct {
|
|
type ClusterConfig struct {
|
|
|
- Hosts []string
|
|
|
|
|
- CQLVersion string
|
|
|
|
|
- ProtoVersion int
|
|
|
|
|
- Timeout time.Duration
|
|
|
|
|
- DefaultPort int
|
|
|
|
|
- Keyspace string
|
|
|
|
|
- NumConns int
|
|
|
|
|
- NumStreams int
|
|
|
|
|
- DelayMin time.Duration
|
|
|
|
|
- DelayMax time.Duration
|
|
|
|
|
- StartupMin int
|
|
|
|
|
|
|
+ Hosts []string // addresses for the initial connections
|
|
|
|
|
+ CQLVersion string // CQL version (default: 3.0.0)
|
|
|
|
|
+ ProtoVersion int // version of the native protocol (default: 2)
|
|
|
|
|
+ Timeout time.Duration // connection timeout (default: 200ms)
|
|
|
|
|
+ DefaultPort int // default port (default: 9042)
|
|
|
|
|
+ Keyspace string // initial keyspace (optional)
|
|
|
|
|
+ NumConns int // number of connections per host (default: 2)
|
|
|
|
|
+ NumStreams int // number of streams per connection (default: 128)
|
|
|
|
|
+ DelayMin time.Duration // minimum reconnection delay (default: 1s)
|
|
|
|
|
+ DelayMax time.Duration // maximum reconnection delay (default: 10min)
|
|
|
|
|
+ StartupMin int // wait for StartupMin hosts (default: len(Hosts)/2+1)
|
|
|
|
|
+ Consistency Consistency // default consistency level (default: Quorum)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// NewCluster generates a new config for the default cluster implementation.
|
|
|
func NewCluster(hosts ...string) *ClusterConfig {
|
|
func NewCluster(hosts ...string) *ClusterConfig {
|
|
|
cfg := &ClusterConfig{
|
|
cfg := &ClusterConfig{
|
|
|
Hosts: hosts,
|
|
Hosts: hosts,
|
|
@@ -43,18 +43,30 @@ func NewCluster(hosts ...string) *ClusterConfig {
|
|
|
DelayMin: 1 * time.Second,
|
|
DelayMin: 1 * time.Second,
|
|
|
DelayMax: 10 * time.Minute,
|
|
DelayMax: 10 * time.Minute,
|
|
|
StartupMin: len(hosts)/2 + 1,
|
|
StartupMin: len(hosts)/2 + 1,
|
|
|
|
|
+ Consistency: Quorum,
|
|
|
}
|
|
}
|
|
|
return cfg
|
|
return cfg
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// CreateSession initializes the cluster based on this config and returns a
|
|
|
|
|
+// session object that can be used to interact with the database.
|
|
|
func (cfg *ClusterConfig) CreateSession() *Session {
|
|
func (cfg *ClusterConfig) CreateSession() *Session {
|
|
|
impl := &clusterImpl{
|
|
impl := &clusterImpl{
|
|
|
cfg: *cfg,
|
|
cfg: *cfg,
|
|
|
hostPool: NewRoundRobin(),
|
|
hostPool: NewRoundRobin(),
|
|
|
connPool: make(map[string]*RoundRobin),
|
|
connPool: make(map[string]*RoundRobin),
|
|
|
|
|
+ conns: make(map[*Conn]struct{}),
|
|
|
}
|
|
}
|
|
|
impl.wgStart.Add(1)
|
|
impl.wgStart.Add(1)
|
|
|
- impl.startup()
|
|
|
|
|
|
|
+ for i := 0; i < len(impl.cfg.Hosts); i++ {
|
|
|
|
|
+ addr := strings.TrimSpace(impl.cfg.Hosts[i])
|
|
|
|
|
+ if strings.IndexByte(addr, ':') < 0 {
|
|
|
|
|
+ addr = fmt.Sprintf("%s:%d", addr, impl.cfg.DefaultPort)
|
|
|
|
|
+ }
|
|
|
|
|
+ for j := 0; j < impl.cfg.NumConns; j++ {
|
|
|
|
|
+ go impl.connect(addr)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
impl.wgStart.Wait()
|
|
impl.wgStart.Wait()
|
|
|
return NewSession(impl)
|
|
return NewSession(impl)
|
|
|
}
|
|
}
|
|
@@ -63,9 +75,9 @@ type clusterImpl struct {
|
|
|
cfg ClusterConfig
|
|
cfg ClusterConfig
|
|
|
hostPool *RoundRobin
|
|
hostPool *RoundRobin
|
|
|
connPool map[string]*RoundRobin
|
|
connPool map[string]*RoundRobin
|
|
|
- mu sync.RWMutex
|
|
|
|
|
-
|
|
|
|
|
- conns []*Conn
|
|
|
|
|
|
|
+ conns map[*Conn]struct{}
|
|
|
|
|
+ keyspace string
|
|
|
|
|
+ mu sync.Mutex
|
|
|
|
|
|
|
|
started bool
|
|
started bool
|
|
|
wgStart sync.WaitGroup
|
|
wgStart sync.WaitGroup
|
|
@@ -73,20 +85,6 @@ type clusterImpl struct {
|
|
|
quit bool
|
|
quit bool
|
|
|
quitWait chan bool
|
|
quitWait chan bool
|
|
|
quitOnce sync.Once
|
|
quitOnce sync.Once
|
|
|
-
|
|
|
|
|
- keyspace string
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (c *clusterImpl) startup() {
|
|
|
|
|
- for i := 0; i < len(c.cfg.Hosts); i++ {
|
|
|
|
|
- addr := strings.TrimSpace(c.cfg.Hosts[i])
|
|
|
|
|
- if strings.IndexByte(addr, ':') < 0 {
|
|
|
|
|
- addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
|
|
|
|
|
- }
|
|
|
|
|
- for j := 0; j < c.cfg.NumConns; j++ {
|
|
|
|
|
- go c.connect(addr)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *clusterImpl) connect(addr string) {
|
|
func (c *clusterImpl) connect(addr string) {
|
|
@@ -136,6 +134,7 @@ func (c *clusterImpl) addConn(conn *Conn, keyspace string) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
if keyspace != c.keyspace && c.keyspace != "" {
|
|
if keyspace != c.keyspace && c.keyspace != "" {
|
|
|
|
|
+ // change the keyspace before adding the node to the pool
|
|
|
go c.changeKeyspace(conn, c.keyspace, false)
|
|
go c.changeKeyspace(conn, c.keyspace, false)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -150,7 +149,7 @@ func (c *clusterImpl) addConn(conn *Conn, keyspace string) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
connPool.AddNode(conn)
|
|
connPool.AddNode(conn)
|
|
|
- c.conns = append(c.conns, conn)
|
|
|
|
|
|
|
+ c.conns[conn] = struct{}{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *clusterImpl) removeConn(conn *Conn) {
|
|
func (c *clusterImpl) removeConn(conn *Conn) {
|
|
@@ -165,21 +164,16 @@ func (c *clusterImpl) removeConn(conn *Conn) {
|
|
|
if connPool.Size() == 0 {
|
|
if connPool.Size() == 0 {
|
|
|
c.hostPool.RemoveNode(connPool)
|
|
c.hostPool.RemoveNode(connPool)
|
|
|
}
|
|
}
|
|
|
- for i := 0; i < len(c.conns); i++ {
|
|
|
|
|
- if c.conns[i] == conn {
|
|
|
|
|
- last := len(c.conns) - 1
|
|
|
|
|
- c.conns[i], c.conns[last] = c.conns[last], c.conns[i]
|
|
|
|
|
- c.conns = c.conns[:last]
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ delete(c.conns, conn)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *clusterImpl) HandleError(conn *Conn, err error, closed bool) {
|
|
func (c *clusterImpl) HandleError(conn *Conn, err error, closed bool) {
|
|
|
if !closed {
|
|
if !closed {
|
|
|
|
|
+ // ignore all non-fatal errors
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
c.removeConn(conn)
|
|
c.removeConn(conn)
|
|
|
- go c.connect(conn.Address())
|
|
|
|
|
|
|
+ go c.connect(conn.Address()) // reconnect
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *clusterImpl) HandleKeyspace(conn *Conn, keyspace string) {
|
|
func (c *clusterImpl) HandleKeyspace(conn *Conn, keyspace string) {
|
|
@@ -189,10 +183,13 @@ func (c *clusterImpl) HandleKeyspace(conn *Conn, keyspace string) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
c.keyspace = keyspace
|
|
c.keyspace = keyspace
|
|
|
- conns := make([]*Conn, len(c.conns))
|
|
|
|
|
- copy(conns, c.conns)
|
|
|
|
|
|
|
+ conns := make([]*Conn, 0, len(c.conns))
|
|
|
|
|
+ for conn := range c.conns {
|
|
|
|
|
+ conns = append(conns, conn)
|
|
|
|
|
+ }
|
|
|
c.mu.Unlock()
|
|
c.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
+ // change the keyspace of all other connections too
|
|
|
for i := 0; i < len(conns); i++ {
|
|
for i := 0; i < len(conns); i++ {
|
|
|
if conns[i] == conn {
|
|
if conns[i] == conn {
|
|
|
continue
|
|
continue
|
|
@@ -202,10 +199,16 @@ func (c *clusterImpl) HandleKeyspace(conn *Conn, keyspace string) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *clusterImpl) ExecuteQuery(qry *Query) (*Iter, error) {
|
|
func (c *clusterImpl) ExecuteQuery(qry *Query) (*Iter, error) {
|
|
|
|
|
+ if qry.Cons == 0 {
|
|
|
|
|
+ qry.Cons = c.cfg.Consistency
|
|
|
|
|
+ }
|
|
|
return c.hostPool.ExecuteQuery(qry)
|
|
return c.hostPool.ExecuteQuery(qry)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *clusterImpl) ExecuteBatch(batch *Batch) error {
|
|
func (c *clusterImpl) ExecuteBatch(batch *Batch) error {
|
|
|
|
|
+ if batch.Cons == 0 {
|
|
|
|
|
+ batch.Cons = c.cfg.Consistency
|
|
|
|
|
+ }
|
|
|
return c.hostPool.ExecuteBatch(batch)
|
|
return c.hostPool.ExecuteBatch(batch)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -215,8 +218,8 @@ func (c *clusterImpl) Close() {
|
|
|
defer c.mu.Unlock()
|
|
defer c.mu.Unlock()
|
|
|
c.quit = true
|
|
c.quit = true
|
|
|
close(c.quitWait)
|
|
close(c.quitWait)
|
|
|
- for i := 0; i < len(c.conns); i++ {
|
|
|
|
|
- c.conns[i].Close()
|
|
|
|
|
|
|
+ for conn := range c.conns {
|
|
|
|
|
+ conn.Close()
|
|
|
}
|
|
}
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|