Просмотр исходного кода

Fix data race when accessing hostMetrics (#1413)

ObservedQuery and ObservedBatch expose a pointer to hostMetrics struct
to the respective observers. However, the fields of the returned
*hostMetrics are not safe to read, since the observer that might access
hostMetrics' fields is called without the lock held.

In order to return a safe value to clients, we need to make a copy of
the hostMetrics struct while the lock is still locked.

This commit also optimizes the count of lock/unlock calls, previously we
did 3 pairs (for updating attempts, for updating latency and for getting
hostMetrics), now we only do one.
Martin Sucha 5 лет назад
Родитель
Сommit
7153289b5a
1 измененных файлов с 40 добавлено и 26 удалено
  1. 40 26
      session.go

+ 40 - 26
session.go

@@ -696,7 +696,11 @@ func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{})
 }
 
 type hostMetrics struct {
+	// Attempts is count of how many times this query has been attempted for this host.
+	// An attempt is either a retry or fetching next page of results.
 	Attempts     int
+
+	// TotalLatency is the sum of attempt latencies for this host in nanoseconds.
 	TotalLatency int64
 }
 
@@ -717,12 +721,15 @@ func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics {
 	return qm
 }
 
-// hostMetricsLocked gets or creates host metrics for given host.
+// hostMetrics returns a snapshot of metrics for given host.
+// If the metrics for host don't exist, they are created.
 func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics {
 	qm.l.Lock()
 	metrics := qm.hostMetricsLocked(host)
+	copied := new(hostMetrics)
+	*copied = *metrics
 	qm.l.Unlock()
-	return metrics
+	return copied
 }
 
 // hostMetricsLocked gets or creates host metrics for given host.
@@ -746,17 +753,6 @@ func (qm *queryMetrics) attempts() int {
 	return attempts
 }
 
-// addAttempts adds given number of attempts and returns previous total attempts.
-func (qm *queryMetrics) addAttempts(i int, host *HostInfo) int {
-	qm.l.Lock()
-	hostMetric := qm.hostMetricsLocked(host)
-	hostMetric.Attempts += i
-	attempts := qm.totalAttempts
-	qm.totalAttempts += i
-	qm.l.Unlock()
-	return attempts
-}
-
 func (qm *queryMetrics) latency() int64 {
 	qm.l.Lock()
 	var (
@@ -774,11 +770,28 @@ func (qm *queryMetrics) latency() int64 {
 	return 0
 }
 
-func (qm *queryMetrics) addLatency(l int64, host *HostInfo) {
+// attempt adds given number of attempts and latency for given host.
+// It returns previous total attempts.
+// If needsHostMetrics is true, a copy of updated hostMetrics is returned.
+func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration,
+	host *HostInfo, needsHostMetrics bool) (int, *hostMetrics) {
 	qm.l.Lock()
-	hostMetric := qm.hostMetricsLocked(host)
-	hostMetric.TotalLatency += l
+
+	totalAttempts := qm.totalAttempts
+	qm.totalAttempts += addAttempts
+
+	updateHostMetrics := qm.hostMetricsLocked(host)
+	updateHostMetrics.Attempts += addAttempts
+	updateHostMetrics.TotalLatency += addLatency.Nanoseconds()
+
+	var hostMetricsCopy *hostMetrics
+	if needsHostMetrics {
+		hostMetricsCopy = new(hostMetrics)
+		*hostMetricsCopy = *updateHostMetrics
+	}
+
 	qm.l.Unlock()
+	return totalAttempts, hostMetricsCopy
 }
 
 // Query represents a CQL statement that can be executed.
@@ -850,7 +863,7 @@ func (q *Query) Attempts() int {
 }
 
 func (q *Query) AddAttempts(i int, host *HostInfo) {
-	q.metrics.addAttempts(i, host)
+	q.metrics.attempt(i, 0, host, false)
 }
 
 //Latency returns the average amount of nanoseconds per attempt of the query.
@@ -859,7 +872,7 @@ func (q *Query) Latency() int64 {
 }
 
 func (q *Query) AddLatency(l int64, host *HostInfo) {
-	q.metrics.addLatency(l, host)
+	q.metrics.attempt(0, time.Duration(l) * time.Nanosecond, host, false)
 }
 
 // Consistency sets the consistency level for this query. If no consistency
@@ -974,8 +987,8 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter {
 }
 
 func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
-	attempt := q.metrics.addAttempts(1, host)
-	q.AddLatency(end.Sub(start).Nanoseconds(), host)
+	latency := end.Sub(start)
+	attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil)
 
 	if q.observer != nil {
 		q.observer.ObserveQuery(q.Context(), ObservedQuery{
@@ -985,7 +998,7 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host
 			End:       end,
 			Rows:      iter.numRows,
 			Host:      host,
-			Metrics:   q.metrics.hostMetrics(host),
+			Metrics:   metricsForHost,
 			Err:       iter.err,
 			Attempt:   attempt,
 		})
@@ -1586,7 +1599,7 @@ func (b *Batch) Attempts() int {
 }
 
 func (b *Batch) AddAttempts(i int, host *HostInfo) {
-	b.metrics.addAttempts(i, host)
+	b.metrics.attempt(i, 0, host, false)
 }
 
 //Latency returns the average number of nanoseconds to execute a single attempt of the batch.
@@ -1595,7 +1608,7 @@ func (b *Batch) Latency() int64 {
 }
 
 func (b *Batch) AddLatency(l int64, host *HostInfo) {
-	b.metrics.addLatency(l, host)
+	b.metrics.attempt(0, time.Duration(l) * time.Nanosecond, host, false)
 }
 
 // GetConsistency returns the currently configured consistency level for the batch
@@ -1719,8 +1732,8 @@ func (b *Batch) WithTimestamp(timestamp int64) *Batch {
 }
 
 func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
-	b.AddAttempts(1, host)
-	b.AddLatency(end.Sub(start).Nanoseconds(), host)
+	latency := end.Sub(start)
+	_, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil)
 
 	if b.observer == nil {
 		return
@@ -1738,7 +1751,7 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host
 		End:        end,
 		// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
 		Host:    host,
-		Metrics: b.metrics.hostMetrics(host),
+		Metrics: metricsForHost,
 		Err:     iter.err,
 	})
 }
@@ -1955,6 +1968,7 @@ type ObservedQuery struct {
 	Err error
 
 	// Attempt is the index of attempt at executing this query.
+	// An attempt might be either retry or fetching next page of a query.
 	// The first attempt is number zero and any retries have non-zero attempt number.
 	Attempt int
 }