timingwheel_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. package collection
  2. import (
  3. "sort"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/lang"
  10. "github.com/tal-tech/go-zero/core/stringx"
  11. "github.com/tal-tech/go-zero/core/syncx"
  12. "github.com/tal-tech/go-zero/core/timex"
  13. )
  14. const (
  15. testStep = time.Minute
  16. waitTime = time.Second
  17. )
  18. func TestNewTimingWheel(t *testing.T) {
  19. _, err := NewTimingWheel(0, 10, func(key, value interface{}) {})
  20. assert.NotNil(t, err)
  21. }
  22. func TestTimingWheel_Drain(t *testing.T) {
  23. ticker := timex.NewFakeTicker()
  24. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  25. }, ticker)
  26. defer tw.Stop()
  27. tw.SetTimer("first", 3, testStep*4)
  28. tw.SetTimer("second", 5, testStep*7)
  29. tw.SetTimer("third", 7, testStep*7)
  30. var keys []string
  31. var vals []int
  32. var lock sync.Mutex
  33. var wg sync.WaitGroup
  34. wg.Add(3)
  35. tw.Drain(func(key, value interface{}) {
  36. lock.Lock()
  37. defer lock.Unlock()
  38. keys = append(keys, key.(string))
  39. vals = append(vals, value.(int))
  40. wg.Done()
  41. })
  42. wg.Wait()
  43. sort.Strings(keys)
  44. sort.Ints(vals)
  45. assert.Equal(t, 3, len(keys))
  46. assert.EqualValues(t, []string{"first", "second", "third"}, keys)
  47. assert.EqualValues(t, []int{3, 5, 7}, vals)
  48. var count int
  49. tw.Drain(func(key, value interface{}) {
  50. count++
  51. })
  52. time.Sleep(time.Millisecond * 100)
  53. assert.Equal(t, 0, count)
  54. }
  55. func TestTimingWheel_SetTimerSoon(t *testing.T) {
  56. run := syncx.NewAtomicBool()
  57. ticker := timex.NewFakeTicker()
  58. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  59. assert.True(t, run.CompareAndSwap(false, true))
  60. assert.Equal(t, "any", k)
  61. assert.Equal(t, 3, v.(int))
  62. ticker.Done()
  63. }, ticker)
  64. defer tw.Stop()
  65. tw.SetTimer("any", 3, testStep>>1)
  66. ticker.Tick()
  67. assert.Nil(t, ticker.Wait(waitTime))
  68. assert.True(t, run.True())
  69. }
  70. func TestTimingWheel_SetTimerTwice(t *testing.T) {
  71. run := syncx.NewAtomicBool()
  72. ticker := timex.NewFakeTicker()
  73. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  74. assert.True(t, run.CompareAndSwap(false, true))
  75. assert.Equal(t, "any", k)
  76. assert.Equal(t, 5, v.(int))
  77. ticker.Done()
  78. }, ticker)
  79. defer tw.Stop()
  80. tw.SetTimer("any", 3, testStep*4)
  81. tw.SetTimer("any", 5, testStep*7)
  82. for i := 0; i < 8; i++ {
  83. ticker.Tick()
  84. }
  85. assert.Nil(t, ticker.Wait(waitTime))
  86. assert.True(t, run.True())
  87. }
  88. func TestTimingWheel_SetTimerWrongDelay(t *testing.T) {
  89. ticker := timex.NewFakeTicker()
  90. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {}, ticker)
  91. defer tw.Stop()
  92. assert.NotPanics(t, func() {
  93. tw.SetTimer("any", 3, -testStep)
  94. })
  95. }
  96. func TestTimingWheel_MoveTimer(t *testing.T) {
  97. run := syncx.NewAtomicBool()
  98. ticker := timex.NewFakeTicker()
  99. tw, _ := newTimingWheelWithClock(testStep, 3, func(k, v interface{}) {
  100. assert.True(t, run.CompareAndSwap(false, true))
  101. assert.Equal(t, "any", k)
  102. assert.Equal(t, 3, v.(int))
  103. ticker.Done()
  104. }, ticker)
  105. defer tw.Stop()
  106. tw.SetTimer("any", 3, testStep*4)
  107. tw.MoveTimer("any", testStep*7)
  108. tw.MoveTimer("any", -testStep)
  109. tw.MoveTimer("none", testStep)
  110. for i := 0; i < 5; i++ {
  111. ticker.Tick()
  112. }
  113. assert.False(t, run.True())
  114. for i := 0; i < 3; i++ {
  115. ticker.Tick()
  116. }
  117. assert.Nil(t, ticker.Wait(waitTime))
  118. assert.True(t, run.True())
  119. }
  120. func TestTimingWheel_MoveTimerSoon(t *testing.T) {
  121. run := syncx.NewAtomicBool()
  122. ticker := timex.NewFakeTicker()
  123. tw, _ := newTimingWheelWithClock(testStep, 3, func(k, v interface{}) {
  124. assert.True(t, run.CompareAndSwap(false, true))
  125. assert.Equal(t, "any", k)
  126. assert.Equal(t, 3, v.(int))
  127. ticker.Done()
  128. }, ticker)
  129. defer tw.Stop()
  130. tw.SetTimer("any", 3, testStep*4)
  131. tw.MoveTimer("any", testStep>>1)
  132. assert.Nil(t, ticker.Wait(waitTime))
  133. assert.True(t, run.True())
  134. }
  135. func TestTimingWheel_MoveTimerEarlier(t *testing.T) {
  136. run := syncx.NewAtomicBool()
  137. ticker := timex.NewFakeTicker()
  138. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  139. assert.True(t, run.CompareAndSwap(false, true))
  140. assert.Equal(t, "any", k)
  141. assert.Equal(t, 3, v.(int))
  142. ticker.Done()
  143. }, ticker)
  144. defer tw.Stop()
  145. tw.SetTimer("any", 3, testStep*4)
  146. tw.MoveTimer("any", testStep*2)
  147. for i := 0; i < 3; i++ {
  148. ticker.Tick()
  149. }
  150. assert.Nil(t, ticker.Wait(waitTime))
  151. assert.True(t, run.True())
  152. }
  153. func TestTimingWheel_RemoveTimer(t *testing.T) {
  154. ticker := timex.NewFakeTicker()
  155. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {}, ticker)
  156. tw.SetTimer("any", 3, testStep)
  157. assert.NotPanics(t, func() {
  158. tw.RemoveTimer("any")
  159. tw.RemoveTimer("none")
  160. tw.RemoveTimer(nil)
  161. })
  162. for i := 0; i < 5; i++ {
  163. ticker.Tick()
  164. }
  165. tw.Stop()
  166. }
  167. func TestTimingWheel_SetTimer(t *testing.T) {
  168. tests := []struct {
  169. slots int
  170. setAt time.Duration
  171. }{
  172. {
  173. slots: 5,
  174. setAt: 5,
  175. },
  176. {
  177. slots: 5,
  178. setAt: 7,
  179. },
  180. {
  181. slots: 5,
  182. setAt: 10,
  183. },
  184. {
  185. slots: 5,
  186. setAt: 12,
  187. },
  188. {
  189. slots: 5,
  190. setAt: 7,
  191. },
  192. {
  193. slots: 5,
  194. setAt: 10,
  195. },
  196. {
  197. slots: 5,
  198. setAt: 12,
  199. },
  200. }
  201. for _, test := range tests {
  202. test := test
  203. t.Run(stringx.RandId(), func(t *testing.T) {
  204. t.Parallel()
  205. var count int32
  206. ticker := timex.NewFakeTicker()
  207. tick := func() {
  208. atomic.AddInt32(&count, 1)
  209. ticker.Tick()
  210. time.Sleep(time.Millisecond)
  211. }
  212. var actual int32
  213. done := make(chan lang.PlaceholderType)
  214. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  215. assert.Equal(t, 1, key.(int))
  216. assert.Equal(t, 2, value.(int))
  217. actual = atomic.LoadInt32(&count)
  218. close(done)
  219. }, ticker)
  220. assert.Nil(t, err)
  221. defer tw.Stop()
  222. tw.SetTimer(1, 2, testStep*test.setAt)
  223. for {
  224. select {
  225. case <-done:
  226. assert.Equal(t, int32(test.setAt), actual)
  227. return
  228. default:
  229. tick()
  230. }
  231. }
  232. })
  233. }
  234. }
  235. func TestTimingWheel_SetAndMoveThenStart(t *testing.T) {
  236. tests := []struct {
  237. slots int
  238. setAt time.Duration
  239. moveAt time.Duration
  240. }{
  241. {
  242. slots: 5,
  243. setAt: 3,
  244. moveAt: 5,
  245. },
  246. {
  247. slots: 5,
  248. setAt: 3,
  249. moveAt: 7,
  250. },
  251. {
  252. slots: 5,
  253. setAt: 3,
  254. moveAt: 10,
  255. },
  256. {
  257. slots: 5,
  258. setAt: 3,
  259. moveAt: 12,
  260. },
  261. {
  262. slots: 5,
  263. setAt: 5,
  264. moveAt: 7,
  265. },
  266. {
  267. slots: 5,
  268. setAt: 5,
  269. moveAt: 10,
  270. },
  271. {
  272. slots: 5,
  273. setAt: 5,
  274. moveAt: 12,
  275. },
  276. }
  277. for _, test := range tests {
  278. test := test
  279. t.Run(stringx.RandId(), func(t *testing.T) {
  280. t.Parallel()
  281. var count int32
  282. ticker := timex.NewFakeTicker()
  283. tick := func() {
  284. atomic.AddInt32(&count, 1)
  285. ticker.Tick()
  286. time.Sleep(time.Millisecond * 10)
  287. }
  288. var actual int32
  289. done := make(chan lang.PlaceholderType)
  290. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  291. actual = atomic.LoadInt32(&count)
  292. close(done)
  293. }, ticker)
  294. assert.Nil(t, err)
  295. defer tw.Stop()
  296. tw.SetTimer(1, 2, testStep*test.setAt)
  297. tw.MoveTimer(1, testStep*test.moveAt)
  298. for {
  299. select {
  300. case <-done:
  301. assert.Equal(t, int32(test.moveAt), actual)
  302. return
  303. default:
  304. tick()
  305. }
  306. }
  307. })
  308. }
  309. }
  310. func TestTimingWheel_SetAndMoveTwice(t *testing.T) {
  311. tests := []struct {
  312. slots int
  313. setAt time.Duration
  314. moveAt time.Duration
  315. moveAgainAt time.Duration
  316. }{
  317. {
  318. slots: 5,
  319. setAt: 3,
  320. moveAt: 5,
  321. moveAgainAt: 10,
  322. },
  323. {
  324. slots: 5,
  325. setAt: 3,
  326. moveAt: 7,
  327. moveAgainAt: 12,
  328. },
  329. {
  330. slots: 5,
  331. setAt: 3,
  332. moveAt: 10,
  333. moveAgainAt: 15,
  334. },
  335. {
  336. slots: 5,
  337. setAt: 3,
  338. moveAt: 12,
  339. moveAgainAt: 17,
  340. },
  341. {
  342. slots: 5,
  343. setAt: 5,
  344. moveAt: 7,
  345. moveAgainAt: 12,
  346. },
  347. {
  348. slots: 5,
  349. setAt: 5,
  350. moveAt: 10,
  351. moveAgainAt: 17,
  352. },
  353. {
  354. slots: 5,
  355. setAt: 5,
  356. moveAt: 12,
  357. moveAgainAt: 17,
  358. },
  359. }
  360. for _, test := range tests {
  361. test := test
  362. t.Run(stringx.RandId(), func(t *testing.T) {
  363. t.Parallel()
  364. var count int32
  365. ticker := timex.NewFakeTicker()
  366. tick := func() {
  367. atomic.AddInt32(&count, 1)
  368. ticker.Tick()
  369. time.Sleep(time.Millisecond * 10)
  370. }
  371. var actual int32
  372. done := make(chan lang.PlaceholderType)
  373. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  374. actual = atomic.LoadInt32(&count)
  375. close(done)
  376. }, ticker)
  377. assert.Nil(t, err)
  378. defer tw.Stop()
  379. tw.SetTimer(1, 2, testStep*test.setAt)
  380. tw.MoveTimer(1, testStep*test.moveAt)
  381. tw.MoveTimer(1, testStep*test.moveAgainAt)
  382. for {
  383. select {
  384. case <-done:
  385. assert.Equal(t, int32(test.moveAgainAt), actual)
  386. return
  387. default:
  388. tick()
  389. }
  390. }
  391. })
  392. }
  393. }
  394. func TestTimingWheel_ElapsedAndSet(t *testing.T) {
  395. tests := []struct {
  396. slots int
  397. elapsed time.Duration
  398. setAt time.Duration
  399. }{
  400. {
  401. slots: 5,
  402. elapsed: 3,
  403. setAt: 5,
  404. },
  405. {
  406. slots: 5,
  407. elapsed: 3,
  408. setAt: 7,
  409. },
  410. {
  411. slots: 5,
  412. elapsed: 3,
  413. setAt: 10,
  414. },
  415. {
  416. slots: 5,
  417. elapsed: 3,
  418. setAt: 12,
  419. },
  420. {
  421. slots: 5,
  422. elapsed: 5,
  423. setAt: 7,
  424. },
  425. {
  426. slots: 5,
  427. elapsed: 5,
  428. setAt: 10,
  429. },
  430. {
  431. slots: 5,
  432. elapsed: 5,
  433. setAt: 12,
  434. },
  435. }
  436. for _, test := range tests {
  437. test := test
  438. t.Run(stringx.RandId(), func(t *testing.T) {
  439. t.Parallel()
  440. var count int32
  441. ticker := timex.NewFakeTicker()
  442. tick := func() {
  443. atomic.AddInt32(&count, 1)
  444. ticker.Tick()
  445. time.Sleep(time.Millisecond * 10)
  446. }
  447. var actual int32
  448. done := make(chan lang.PlaceholderType)
  449. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  450. actual = atomic.LoadInt32(&count)
  451. close(done)
  452. }, ticker)
  453. assert.Nil(t, err)
  454. defer tw.Stop()
  455. for i := 0; i < int(test.elapsed); i++ {
  456. tick()
  457. }
  458. tw.SetTimer(1, 2, testStep*test.setAt)
  459. for {
  460. select {
  461. case <-done:
  462. assert.Equal(t, int32(test.elapsed+test.setAt), actual)
  463. return
  464. default:
  465. tick()
  466. }
  467. }
  468. })
  469. }
  470. }
  471. func TestTimingWheel_ElapsedAndSetThenMove(t *testing.T) {
  472. tests := []struct {
  473. slots int
  474. elapsed time.Duration
  475. setAt time.Duration
  476. moveAt time.Duration
  477. }{
  478. {
  479. slots: 5,
  480. elapsed: 3,
  481. setAt: 5,
  482. moveAt: 10,
  483. },
  484. {
  485. slots: 5,
  486. elapsed: 3,
  487. setAt: 7,
  488. moveAt: 12,
  489. },
  490. {
  491. slots: 5,
  492. elapsed: 3,
  493. setAt: 10,
  494. moveAt: 15,
  495. },
  496. {
  497. slots: 5,
  498. elapsed: 3,
  499. setAt: 12,
  500. moveAt: 16,
  501. },
  502. {
  503. slots: 5,
  504. elapsed: 5,
  505. setAt: 7,
  506. moveAt: 12,
  507. },
  508. {
  509. slots: 5,
  510. elapsed: 5,
  511. setAt: 10,
  512. moveAt: 15,
  513. },
  514. {
  515. slots: 5,
  516. elapsed: 5,
  517. setAt: 12,
  518. moveAt: 17,
  519. },
  520. }
  521. for _, test := range tests {
  522. test := test
  523. t.Run(stringx.RandId(), func(t *testing.T) {
  524. t.Parallel()
  525. var count int32
  526. ticker := timex.NewFakeTicker()
  527. tick := func() {
  528. atomic.AddInt32(&count, 1)
  529. ticker.Tick()
  530. time.Sleep(time.Millisecond * 10)
  531. }
  532. var actual int32
  533. done := make(chan lang.PlaceholderType)
  534. tw, err := newTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) {
  535. actual = atomic.LoadInt32(&count)
  536. close(done)
  537. }, ticker)
  538. assert.Nil(t, err)
  539. defer tw.Stop()
  540. for i := 0; i < int(test.elapsed); i++ {
  541. tick()
  542. }
  543. tw.SetTimer(1, 2, testStep*test.setAt)
  544. tw.MoveTimer(1, testStep*test.moveAt)
  545. for {
  546. select {
  547. case <-done:
  548. assert.Equal(t, int32(test.elapsed+test.moveAt), actual)
  549. return
  550. default:
  551. tick()
  552. }
  553. }
  554. })
  555. }
  556. }
  557. func TestMoveAndRemoveTask(t *testing.T) {
  558. ticker := timex.NewFakeTicker()
  559. tick := func(v int) {
  560. for i := 0; i < v; i++ {
  561. ticker.Tick()
  562. }
  563. }
  564. var keys []int
  565. tw, _ := newTimingWheelWithClock(testStep, 10, func(k, v interface{}) {
  566. assert.Equal(t, "any", k)
  567. assert.Equal(t, 3, v.(int))
  568. keys = append(keys, v.(int))
  569. ticker.Done()
  570. }, ticker)
  571. defer tw.Stop()
  572. tw.SetTimer("any", 3, testStep*8)
  573. tick(6)
  574. tw.MoveTimer("any", testStep*7)
  575. tick(3)
  576. tw.RemoveTimer("any")
  577. tick(30)
  578. time.Sleep(time.Millisecond)
  579. assert.Equal(t, 0, len(keys))
  580. }
  581. func BenchmarkTimingWheel(b *testing.B) {
  582. b.ReportAllocs()
  583. tw, _ := NewTimingWheel(time.Second, 100, func(k, v interface{}) {})
  584. for i := 0; i < b.N; i++ {
  585. tw.SetTimer(i, i, time.Second)
  586. tw.SetTimer(b.N+i, b.N+i, time.Second)
  587. tw.MoveTimer(i, time.Second*time.Duration(i))
  588. tw.RemoveTimer(i)
  589. }
  590. }