streams.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package streams
  2. import (
  3. "math"
  4. "strconv"
  5. "sync/atomic"
  6. )
  7. const bucketBits = 64
  8. // IDGenerator tracks and allocates streams which are in use.
  9. type IDGenerator struct {
  10. NumStreams int
  11. inuseStreams int32
  12. numBuckets uint32
  13. // streams is a bitset where each bit represents a stream, a 1 implies in use
  14. streams []uint64
  15. offset uint32
  16. }
  17. func New(protocol int) *IDGenerator {
  18. maxStreams := 128
  19. if protocol > 2 {
  20. maxStreams = 32768
  21. }
  22. buckets := maxStreams / 64
  23. // reserve stream 0
  24. streams := make([]uint64, buckets)
  25. streams[0] = 1 << 63
  26. return &IDGenerator{
  27. NumStreams: maxStreams,
  28. streams: streams,
  29. numBuckets: uint32(buckets),
  30. offset: uint32(buckets) - 1,
  31. }
  32. }
  33. func streamFromBucket(bucket, streamInBucket int) int {
  34. return (bucket * bucketBits) + streamInBucket
  35. }
  36. func (s *IDGenerator) GetStream() (int, bool) {
  37. // based closely on the java-driver stream ID generator
  38. // avoid false sharing subsequent requests.
  39. offset := atomic.LoadUint32(&s.offset)
  40. for !atomic.CompareAndSwapUint32(&s.offset, offset, (offset+1)%s.numBuckets) {
  41. offset = atomic.LoadUint32(&s.offset)
  42. }
  43. offset = (offset + 1) % s.numBuckets
  44. for i := uint32(0); i < s.numBuckets; i++ {
  45. pos := int((i + offset) % s.numBuckets)
  46. bucket := atomic.LoadUint64(&s.streams[pos])
  47. if bucket == math.MaxUint64 {
  48. // all streams in use
  49. continue
  50. }
  51. for j := 0; j < bucketBits; j++ {
  52. mask := uint64(1 << streamOffset(j))
  53. for bucket&mask == 0 {
  54. if atomic.CompareAndSwapUint64(&s.streams[pos], bucket, bucket|mask) {
  55. atomic.AddInt32(&s.inuseStreams, 1)
  56. return streamFromBucket(int(pos), j), true
  57. }
  58. bucket = atomic.LoadUint64(&s.streams[pos])
  59. }
  60. }
  61. }
  62. return 0, false
  63. }
  64. func bitfmt(b uint64) string {
  65. return strconv.FormatUint(b, 16)
  66. }
  67. // returns the bucket offset of a given stream
  68. func bucketOffset(i int) int {
  69. return i / bucketBits
  70. }
  71. func streamOffset(stream int) uint64 {
  72. return bucketBits - uint64(stream%bucketBits) - 1
  73. }
  74. func isSet(bits uint64, stream int) bool {
  75. return bits>>streamOffset(stream)&1 == 1
  76. }
  77. func (s *IDGenerator) isSet(stream int) bool {
  78. bits := atomic.LoadUint64(&s.streams[bucketOffset(stream)])
  79. return isSet(bits, stream)
  80. }
  81. func (s *IDGenerator) String() string {
  82. size := s.numBuckets * (bucketBits + 1)
  83. buf := make([]byte, 0, size)
  84. for i := 0; i < int(s.numBuckets); i++ {
  85. bits := atomic.LoadUint64(&s.streams[i])
  86. buf = append(buf, bitfmt(bits)...)
  87. buf = append(buf, ' ')
  88. }
  89. return string(buf[: size-1 : size-1])
  90. }
  91. func (s *IDGenerator) Clear(stream int) (inuse bool) {
  92. offset := bucketOffset(stream)
  93. bucket := atomic.LoadUint64(&s.streams[offset])
  94. mask := uint64(1) << streamOffset(stream)
  95. if bucket&mask != mask {
  96. // already cleared
  97. return false
  98. }
  99. for !atomic.CompareAndSwapUint64(&s.streams[offset], bucket, bucket & ^mask) {
  100. bucket = atomic.LoadUint64(&s.streams[offset])
  101. if bucket&mask != mask {
  102. // already cleared
  103. return false
  104. }
  105. }
  106. // TODO: make this account for 0 stream being reserved
  107. if atomic.AddInt32(&s.inuseStreams, -1) < 0 {
  108. // TODO(zariel): remove this
  109. panic("negative streams inuse")
  110. }
  111. return true
  112. }
  113. func (s *IDGenerator) Available() int {
  114. return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1
  115. }