Browse Source

reduce number of gourtines used to execute queries (#1241)

When a query is idempotent or speculation is disabled do not launch a
new goroutine.

When we are speculating don't launch a goroutine to do the speculation
instead do the check in line for results while waiting for a speculation
tick.

This also removes the need to spawn a ticker when speculation is
disabled or the query is non idempotent.
Chris Bannister 7 years ago
parent
commit
9de8c0414f
1 changed files with 24 additions and 19 deletions
  1. 24 19
      query_executor.go

+ 24 - 19
query_executor.go

@@ -34,12 +34,30 @@ func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, c
 	return iter
 }
 
+func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp SpeculativeExecutionPolicy, results chan *Iter) *Iter {
+	ticker := time.NewTicker(sp.Delay())
+	defer ticker.Stop()
+
+	for i := 0; i < sp.Attempts(); i++ {
+		select {
+		case <-ticker.C:
+			go q.run(ctx, qry, results)
+		case <-ctx.Done():
+			return &Iter{err: ctx.Err()}
+		case iter := <-results:
+			return iter
+		}
+	}
+
+	return nil
+}
+
 func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
 	// check if the query is not marked as idempotent, if
 	// it is, we force the policy to NonSpeculative
 	sp := qry.speculativeExecutionPolicy()
-	if !qry.IsIdempotent() {
-		sp = NonSpeculativeExecution{}
+	if !qry.IsIdempotent() || sp.Attempts() == 0 {
+		return q.do(qry.Context(), qry), nil
 	}
 
 	ctx, cancel := context.WithCancel(qry.Context())
@@ -53,22 +71,9 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
 	// The speculative executions are launched _in addition_ to the main
 	// execution, on a timer. So Speculation{2} would make 3 executions running
 	// in total.
-	go func() {
-		// setup a ticker
-		ticker := time.NewTicker(sp.Delay())
-		defer ticker.Stop()
-
-		for i := 0; i < sp.Attempts(); i++ {
-			select {
-			case <-ticker.C:
-				// Launch the additional execution
-				go q.run(ctx, qry, results)
-			case <-ctx.Done():
-				// not starting additional executions
-				return
-			}
-		}
-	}()
+	if iter := q.speculate(ctx, qry, sp, results); iter != nil {
+		return iter, nil
+	}
 
 	select {
 	case iter := <-results:
@@ -140,7 +145,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter {
 	return &Iter{err: ErrNoConnections}
 }
 
-func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan *Iter) {
+func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan<- *Iter) {
 	select {
 	case results <- q.do(ctx, qry):
 	case <-ctx.Done():