|
|
@@ -31,6 +31,9 @@ type Session struct {
|
|
|
trace Tracer
|
|
|
mu sync.RWMutex
|
|
|
cfg ClusterConfig
|
|
|
+
|
|
|
+ closeMu sync.RWMutex
|
|
|
+ isClosed bool
|
|
|
}
|
|
|
|
|
|
// NewSession wraps an existing Node.
|
|
|
@@ -88,54 +91,84 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
|
|
|
// Close closes all connections. The session is unusable after this
|
|
|
// operation.
|
|
|
func (s *Session) Close() {
|
|
|
+ s.closeMu.Lock()
|
|
|
+ if s.isClosed {
|
|
|
+ s.closeMu.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ s.isClosed = true
|
|
|
+ s.closeMu.Unlock()
|
|
|
+
|
|
|
s.Node.Close()
|
|
|
}
|
|
|
|
|
|
+func (s *Session) Closed() bool {
|
|
|
+ s.closeMu.RLock()
|
|
|
+ closed := s.isClosed
|
|
|
+ s.closeMu.RUnlock()
|
|
|
+ return closed
|
|
|
+}
|
|
|
+
|
|
|
func (s *Session) executeQuery(qry *Query) *Iter {
|
|
|
- var itr *Iter
|
|
|
- count := 0
|
|
|
- for count <= qry.rt.NumRetries {
|
|
|
- conn := s.Node.Pick(nil)
|
|
|
+ // fail fast
|
|
|
+ if s.Closed() {
|
|
|
+ return &Iter{err: ErrSessionClosed}
|
|
|
+ }
|
|
|
+
|
|
|
+ var iter *Iter
|
|
|
+ for count := 0; count <= qry.rt.NumRetries; count++ {
|
|
|
+ conn := s.Node.Pick(qry)
|
|
|
//Assign the error unavailable to the iterator
|
|
|
if conn == nil {
|
|
|
- itr = &Iter{err: ErrUnavailable}
|
|
|
+ iter = &Iter{err: ErrNoConnections}
|
|
|
break
|
|
|
}
|
|
|
- itr = conn.executeQuery(qry)
|
|
|
+
|
|
|
+ iter = conn.executeQuery(qry)
|
|
|
//Exit for loop if the query was successful
|
|
|
- if itr.err == nil {
|
|
|
+ if iter.err == nil {
|
|
|
break
|
|
|
}
|
|
|
- count++
|
|
|
}
|
|
|
- return itr
|
|
|
+
|
|
|
+ if iter == nil {
|
|
|
+ iter = &Iter{err: ErrNoConnections}
|
|
|
+ }
|
|
|
+
|
|
|
+ return iter
|
|
|
}
|
|
|
|
|
|
// ExecuteBatch executes a batch operation and returns nil if successful
|
|
|
// otherwise an error is returned describing the failure.
|
|
|
func (s *Session) ExecuteBatch(batch *Batch) error {
|
|
|
+ // fail fast
|
|
|
+ if s.Closed() {
|
|
|
+ return ErrSessionClosed
|
|
|
+ }
|
|
|
+
|
|
|
// Prevent the execution of the batch if greater than the limit
|
|
|
// Currently batches have a limit of 65536 queries.
|
|
|
// https://datastax-oss.atlassian.net/browse/JAVA-229
|
|
|
if batch.Size() > BatchSizeMaximum {
|
|
|
return ErrTooManyStmts
|
|
|
}
|
|
|
+
|
|
|
var err error
|
|
|
- count := 0
|
|
|
- for count <= batch.rt.NumRetries {
|
|
|
+ for count := 0; count <= batch.rt.NumRetries; count++ {
|
|
|
conn := s.Node.Pick(nil)
|
|
|
//Assign the error unavailable and break loop
|
|
|
if conn == nil {
|
|
|
- err = ErrUnavailable
|
|
|
+ err = ErrNoConnections
|
|
|
break
|
|
|
}
|
|
|
+
|
|
|
err = conn.executeBatch(batch)
|
|
|
//Exit loop if operation executed correctly
|
|
|
if err == nil {
|
|
|
- break
|
|
|
+ return nil
|
|
|
}
|
|
|
- count++
|
|
|
}
|
|
|
+
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
@@ -489,11 +522,13 @@ func (e Error) Error() string {
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
- ErrNotFound = errors.New("not found")
|
|
|
- ErrUnavailable = errors.New("unavailable")
|
|
|
- ErrUnsupported = errors.New("feature not supported")
|
|
|
- ErrTooManyStmts = errors.New("too many statements")
|
|
|
- ErrUseStmt = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explaination.")
|
|
|
+ ErrNotFound = errors.New("not found")
|
|
|
+ ErrUnavailable = errors.New("unavailable")
|
|
|
+ ErrUnsupported = errors.New("feature not supported")
|
|
|
+ ErrTooManyStmts = errors.New("too many statements")
|
|
|
+ ErrUseStmt = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explaination.")
|
|
|
+ ErrSessionClosed = errors.New("session has been closed")
|
|
|
+ ErrNoConnections = errors.New("no connections available")
|
|
|
)
|
|
|
|
|
|
type ErrProtocol struct{ error }
|