Procházet zdrojové kódy

Fix two bugs in stream id allocation. (#707)

- Previously if GetStream() found an available stream but the CAS
  failed, it was skipping that stream and trying the next. However, a
  CAS failure doesn't mean the current bit is taken, just that some bit
  in the bucket changed. Fix to keep retrying the CAS while the bit it's
  checking continues to be available in the bucket.
- When the CAS failed in GetStream(), it was re-loading the bucket using
  "offset" instead of "pos". This wouldn't end up causing any incorrect
  behavior, but it greatly reduces the chance of finding an available
  bit in the current bucket since the CAS will only succeed if
  c.streams[pos] happens to be the same as c.streams[offset] (assuming
  pos != offset).
Muir Manders před 9 roky
rodič
revize
91135b0fc2
1 změnil soubory, kde provedl 2 přidání a 2 odebrání
  1. 2 2
      internal/streams/streams.go

+ 2 - 2
internal/streams/streams.go

@@ -62,12 +62,12 @@ func (s *IDGenerator) GetStream() (int, bool) {
 
 		for j := 0; j < bucketBits; j++ {
 			mask := uint64(1 << streamOffset(j))
-			if bucket&mask == 0 {
+			for bucket&mask == 0 {
 				if atomic.CompareAndSwapUint64(&s.streams[pos], bucket, bucket|mask) {
 					atomic.AddInt32(&s.inuseStreams, 1)
 					return streamFromBucket(int(pos), j), true
 				}
-				bucket = atomic.LoadUint64(&s.streams[offset])
+				bucket = atomic.LoadUint64(&s.streams[pos])
 			}
 		}
 	}