streams_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package streams
  2. import (
  3. "math"
  4. "strconv"
  5. "sync/atomic"
  6. "testing"
  7. )
  8. func TestUsesAllStreams(t *testing.T) {
  9. streams := New(1)
  10. got := make(map[int]struct{})
  11. for i := 1; i < streams.NumStreams; i++ {
  12. stream, ok := streams.GetStream()
  13. if !ok {
  14. t.Fatalf("unable to get stream %d", i)
  15. }
  16. if _, ok = got[stream]; ok {
  17. t.Fatalf("got an already allocated stream: %d", stream)
  18. }
  19. got[stream] = struct{}{}
  20. if !streams.isSet(stream) {
  21. bucket := atomic.LoadUint64(&streams.streams[bucketOffset(stream)])
  22. t.Logf("bucket=%d: %s\n", bucket, strconv.FormatUint(bucket, 2))
  23. t.Fatalf("stream not set: %d", stream)
  24. }
  25. }
  26. for i := 1; i < streams.NumStreams; i++ {
  27. if _, ok := got[i]; !ok {
  28. t.Errorf("did not use stream %d", i)
  29. }
  30. }
  31. if _, ok := got[0]; ok {
  32. t.Fatal("expected to not use stream 0")
  33. }
  34. for i, bucket := range streams.streams {
  35. if bucket != math.MaxUint64 {
  36. t.Errorf("did not use all streams in offset=%d bucket=%s", i, bitfmt(bucket))
  37. }
  38. }
  39. }
  40. func TestFullStreams(t *testing.T) {
  41. streams := New(1)
  42. for i := range streams.streams {
  43. streams.streams[i] = math.MaxUint64
  44. }
  45. stream, ok := streams.GetStream()
  46. if ok {
  47. t.Fatalf("should not get stream when all in use: stream=%d", stream)
  48. }
  49. }
  50. func TestClearStreams(t *testing.T) {
  51. streams := New(1)
  52. for i := range streams.streams {
  53. streams.streams[i] = math.MaxUint64
  54. }
  55. streams.inuseStreams = int32(streams.NumStreams)
  56. for i := 0; i < streams.NumStreams; i++ {
  57. streams.Clear(i)
  58. }
  59. for i, bucket := range streams.streams {
  60. if bucket != 0 {
  61. t.Errorf("did not clear streams in offset=%d bucket=%s", i, bitfmt(bucket))
  62. }
  63. }
  64. }
  65. func TestDoubleClear(t *testing.T) {
  66. streams := New(1)
  67. stream, ok := streams.GetStream()
  68. if !ok {
  69. t.Fatal("did not get stream")
  70. }
  71. if !streams.Clear(stream) {
  72. t.Fatalf("stream not indicated as in use: %d", stream)
  73. }
  74. if streams.Clear(stream) {
  75. t.Fatalf("stream not as in use after clear: %d", stream)
  76. }
  77. }
  78. func BenchmarkConcurrentUse(b *testing.B) {
  79. streams := New(2)
  80. b.RunParallel(func(pb *testing.PB) {
  81. for pb.Next() {
  82. stream, ok := streams.GetStream()
  83. if !ok {
  84. b.Error("unable to get stream")
  85. return
  86. }
  87. if !streams.Clear(stream) {
  88. b.Errorf("stream was already cleared: %d", stream)
  89. return
  90. }
  91. }
  92. })
  93. }
  94. func TestStreamOffset(t *testing.T) {
  95. tests := [...]struct {
  96. n int
  97. off uint64
  98. }{
  99. {0, 63},
  100. {1, 62},
  101. {2, 61},
  102. {3, 60},
  103. {63, 0},
  104. {64, 63},
  105. {128, 63},
  106. }
  107. for _, test := range tests {
  108. if off := streamOffset(test.n); off != test.off {
  109. t.Errorf("n=%d expected %d got %d", test.n, off, test.off)
  110. }
  111. }
  112. }
  113. func TestIsSet(t *testing.T) {
  114. tests := [...]struct {
  115. stream int
  116. bucket uint64
  117. set bool
  118. }{
  119. {0, 0, false},
  120. {0, 1 << 63, true},
  121. {1, 0, false},
  122. {1, 1 << 62, true},
  123. {63, 1, true},
  124. {64, 1 << 63, true},
  125. {0, 0x8000000000000000, true},
  126. }
  127. for i, test := range tests {
  128. if set := isSet(test.bucket, test.stream); set != test.set {
  129. t.Errorf("[%d] stream=%d expected %v got %v", i, test.stream, test.set, set)
  130. }
  131. }
  132. for i := 0; i < bucketBits; i++ {
  133. if !isSet(math.MaxUint64, i) {
  134. var shift uint64 = math.MaxUint64 >> streamOffset(i)
  135. t.Errorf("expected isSet for all i=%d got=%d", i, shift)
  136. }
  137. }
  138. }
  139. func TestBucketOfset(t *testing.T) {
  140. tests := [...]struct {
  141. n int
  142. bucket int
  143. }{
  144. {0, 0},
  145. {1, 0},
  146. {63, 0},
  147. {64, 1},
  148. }
  149. for _, test := range tests {
  150. if bucket := bucketOffset(test.n); bucket != test.bucket {
  151. t.Errorf("n=%d expected %v got %v", test.n, test.bucket, bucket)
  152. }
  153. }
  154. }
  155. func TestStreamFromBucket(t *testing.T) {
  156. tests := [...]struct {
  157. bucket int
  158. pos int
  159. stream int
  160. }{
  161. {0, 0, 0},
  162. {0, 1, 1},
  163. {0, 2, 2},
  164. {0, 63, 63},
  165. {1, 0, 64},
  166. {1, 1, 65},
  167. }
  168. for _, test := range tests {
  169. if stream := streamFromBucket(test.bucket, test.pos); stream != test.stream {
  170. t.Errorf("bucket=%d pos=%d expected %v got %v", test.bucket, test.pos, test.stream, stream)
  171. }
  172. }
  173. }