stream_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. package fx
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "math/rand"
  6. "reflect"
  7. "runtime"
  8. "sort"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "git.i2edu.net/i2/go-zero/core/stringx"
  14. "github.com/stretchr/testify/assert"
  15. )
  16. func TestBuffer(t *testing.T) {
  17. const N = 5
  18. var count int32
  19. var wait sync.WaitGroup
  20. wait.Add(1)
  21. From(func(source chan<- interface{}) {
  22. ticker := time.NewTicker(10 * time.Millisecond)
  23. defer ticker.Stop()
  24. for i := 0; i < 2*N; i++ {
  25. select {
  26. case source <- i:
  27. atomic.AddInt32(&count, 1)
  28. case <-ticker.C:
  29. wait.Done()
  30. return
  31. }
  32. }
  33. }).Buffer(N).ForAll(func(pipe <-chan interface{}) {
  34. wait.Wait()
  35. // why N+1, because take one more to wait for sending into the channel
  36. assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
  37. })
  38. }
  39. func TestBufferNegative(t *testing.T) {
  40. var result int
  41. Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  42. for item := range pipe {
  43. result += item.(int)
  44. }
  45. return result, nil
  46. })
  47. assert.Equal(t, 10, result)
  48. }
  49. func TestCount(t *testing.T) {
  50. tests := []struct {
  51. name string
  52. elements []interface{}
  53. }{
  54. {
  55. name: "no elements with nil",
  56. },
  57. {
  58. name: "no elements",
  59. elements: []interface{}{},
  60. },
  61. {
  62. name: "1 element",
  63. elements: []interface{}{1},
  64. },
  65. {
  66. name: "multiple elements",
  67. elements: []interface{}{1, 2, 3},
  68. },
  69. }
  70. for _, test := range tests {
  71. t.Run(test.name, func(t *testing.T) {
  72. val := Just(test.elements...).Count()
  73. assert.Equal(t, len(test.elements), val)
  74. })
  75. }
  76. }
  77. func TestDone(t *testing.T) {
  78. var count int32
  79. Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
  80. time.Sleep(time.Millisecond * 100)
  81. atomic.AddInt32(&count, int32(item.(int)))
  82. }).Done()
  83. assert.Equal(t, int32(6), count)
  84. }
  85. func TestJust(t *testing.T) {
  86. var result int
  87. Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  88. for item := range pipe {
  89. result += item.(int)
  90. }
  91. return result, nil
  92. })
  93. assert.Equal(t, 10, result)
  94. }
  95. func TestDistinct(t *testing.T) {
  96. var result int
  97. Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
  98. return item
  99. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  100. for item := range pipe {
  101. result += item.(int)
  102. }
  103. return result, nil
  104. })
  105. assert.Equal(t, 10, result)
  106. }
  107. func TestFilter(t *testing.T) {
  108. var result int
  109. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  110. return item.(int)%2 == 0
  111. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  112. for item := range pipe {
  113. result += item.(int)
  114. }
  115. return result, nil
  116. })
  117. assert.Equal(t, 6, result)
  118. }
  119. func TestForAll(t *testing.T) {
  120. var result int
  121. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  122. return item.(int)%2 == 0
  123. }).ForAll(func(pipe <-chan interface{}) {
  124. for item := range pipe {
  125. result += item.(int)
  126. }
  127. })
  128. assert.Equal(t, 6, result)
  129. }
  130. func TestGroup(t *testing.T) {
  131. var groups [][]int
  132. Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
  133. v := item.(int)
  134. return v / 10
  135. }).ForEach(func(item interface{}) {
  136. v := item.([]interface{})
  137. var group []int
  138. for _, each := range v {
  139. group = append(group, each.(int))
  140. }
  141. groups = append(groups, group)
  142. })
  143. assert.Equal(t, 2, len(groups))
  144. for _, group := range groups {
  145. assert.Equal(t, 2, len(group))
  146. assert.True(t, group[0]/10 == group[1]/10)
  147. }
  148. }
  149. func TestHead(t *testing.T) {
  150. var result int
  151. Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  152. for item := range pipe {
  153. result += item.(int)
  154. }
  155. return result, nil
  156. })
  157. assert.Equal(t, 3, result)
  158. }
  159. func TestHeadZero(t *testing.T) {
  160. assert.Panics(t, func() {
  161. Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  162. return nil, nil
  163. })
  164. })
  165. }
  166. func TestHeadMore(t *testing.T) {
  167. var result int
  168. Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  169. for item := range pipe {
  170. result += item.(int)
  171. }
  172. return result, nil
  173. })
  174. assert.Equal(t, 10, result)
  175. }
  176. func TestMap(t *testing.T) {
  177. log.SetOutput(ioutil.Discard)
  178. tests := []struct {
  179. mapper MapFunc
  180. expect int
  181. }{
  182. {
  183. mapper: func(item interface{}) interface{} {
  184. v := item.(int)
  185. return v * v
  186. },
  187. expect: 30,
  188. },
  189. {
  190. mapper: func(item interface{}) interface{} {
  191. v := item.(int)
  192. if v%2 == 0 {
  193. return 0
  194. }
  195. return v * v
  196. },
  197. expect: 10,
  198. },
  199. {
  200. mapper: func(item interface{}) interface{} {
  201. v := item.(int)
  202. if v%2 == 0 {
  203. panic(v)
  204. }
  205. return v * v
  206. },
  207. expect: 10,
  208. },
  209. }
  210. // Map(...) works even WithWorkers(0)
  211. for i, test := range tests {
  212. t.Run(stringx.Rand(), func(t *testing.T) {
  213. var result int
  214. var workers int
  215. if i%2 == 0 {
  216. workers = 0
  217. } else {
  218. workers = runtime.NumCPU()
  219. }
  220. From(func(source chan<- interface{}) {
  221. for i := 1; i < 5; i++ {
  222. source <- i
  223. }
  224. }).Map(test.mapper, WithWorkers(workers)).Reduce(
  225. func(pipe <-chan interface{}) (interface{}, error) {
  226. for item := range pipe {
  227. result += item.(int)
  228. }
  229. return result, nil
  230. })
  231. assert.Equal(t, test.expect, result)
  232. })
  233. }
  234. }
  235. func TestMerge(t *testing.T) {
  236. Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
  237. assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
  238. })
  239. }
  240. func TestParallelJust(t *testing.T) {
  241. var count int32
  242. Just(1, 2, 3).Parallel(func(item interface{}) {
  243. time.Sleep(time.Millisecond * 100)
  244. atomic.AddInt32(&count, int32(item.(int)))
  245. }, UnlimitedWorkers())
  246. assert.Equal(t, int32(6), count)
  247. }
  248. func TestReverse(t *testing.T) {
  249. Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
  250. assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
  251. })
  252. }
  253. func TestSort(t *testing.T) {
  254. var prev int
  255. Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
  256. return a.(int) < b.(int)
  257. }).ForEach(func(item interface{}) {
  258. next := item.(int)
  259. assert.True(t, prev < next)
  260. prev = next
  261. })
  262. }
  263. func TestSplit(t *testing.T) {
  264. assert.Panics(t, func() {
  265. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
  266. })
  267. var chunks [][]interface{}
  268. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
  269. chunk := item.([]interface{})
  270. chunks = append(chunks, chunk)
  271. })
  272. assert.EqualValues(t, [][]interface{}{
  273. {1, 2, 3, 4},
  274. {5, 6, 7, 8},
  275. {9, 10},
  276. }, chunks)
  277. }
  278. func TestTail(t *testing.T) {
  279. var result int
  280. Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  281. for item := range pipe {
  282. result += item.(int)
  283. }
  284. return result, nil
  285. })
  286. assert.Equal(t, 7, result)
  287. }
  288. func TestTailZero(t *testing.T) {
  289. assert.Panics(t, func() {
  290. Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  291. return nil, nil
  292. })
  293. })
  294. }
  295. func TestWalk(t *testing.T) {
  296. var result int
  297. Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
  298. if item.(int)%2 != 0 {
  299. pipe <- item
  300. }
  301. }, UnlimitedWorkers()).ForEach(func(item interface{}) {
  302. result += item.(int)
  303. })
  304. assert.Equal(t, 9, result)
  305. }
  306. func BenchmarkParallelMapReduce(b *testing.B) {
  307. b.ReportAllocs()
  308. mapper := func(v interface{}) interface{} {
  309. return v.(int64) * v.(int64)
  310. }
  311. reducer := func(input <-chan interface{}) (interface{}, error) {
  312. var result int64
  313. for v := range input {
  314. result += v.(int64)
  315. }
  316. return result, nil
  317. }
  318. b.ResetTimer()
  319. From(func(input chan<- interface{}) {
  320. b.RunParallel(func(pb *testing.PB) {
  321. for pb.Next() {
  322. input <- int64(rand.Int())
  323. }
  324. })
  325. }).Map(mapper).Reduce(reducer)
  326. }
  327. func BenchmarkMapReduce(b *testing.B) {
  328. b.ReportAllocs()
  329. mapper := func(v interface{}) interface{} {
  330. return v.(int64) * v.(int64)
  331. }
  332. reducer := func(input <-chan interface{}) (interface{}, error) {
  333. var result int64
  334. for v := range input {
  335. result += v.(int64)
  336. }
  337. return result, nil
  338. }
  339. b.ResetTimer()
  340. From(func(input chan<- interface{}) {
  341. for i := 0; i < b.N; i++ {
  342. input <- int64(rand.Int())
  343. }
  344. }).Map(mapper).Reduce(reducer)
  345. }
  346. func equal(t *testing.T, stream Stream, data []interface{}) {
  347. items := make([]interface{}, 0)
  348. for item := range stream.source {
  349. items = append(items, item)
  350. }
  351. if !reflect.DeepEqual(items, data) {
  352. t.Errorf(" %v, want %v", items, data)
  353. }
  354. }
  355. func assetEqual(t *testing.T, except, data interface{}) {
  356. if !reflect.DeepEqual(except, data) {
  357. t.Errorf(" %v, want %v", data, except)
  358. }
  359. }
  360. func TestStream_AnyMach(t *testing.T) {
  361. assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  362. return 4 == item.(int)
  363. }))
  364. assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  365. return 0 == item.(int)
  366. }))
  367. assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  368. return 2 == item.(int)
  369. }))
  370. assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
  371. return 2 == item.(int)
  372. }))
  373. }
  374. func TestStream_AllMach(t *testing.T) {
  375. assetEqual(
  376. t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
  377. return true
  378. }),
  379. )
  380. assetEqual(
  381. t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
  382. return false
  383. }),
  384. )
  385. assetEqual(
  386. t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
  387. return item.(int) == 1
  388. }),
  389. )
  390. }
  391. func TestConcat(t *testing.T) {
  392. a1 := []interface{}{1, 2, 3}
  393. a2 := []interface{}{4, 5, 6}
  394. s1 := Just(a1...)
  395. s2 := Just(a2...)
  396. stream := Concat(s1, s2)
  397. var items []interface{}
  398. for item := range stream.source {
  399. items = append(items, item)
  400. }
  401. sort.Slice(items, func(i, j int) bool {
  402. return items[i].(int) < items[j].(int)
  403. })
  404. ints := make([]interface{}, 0)
  405. ints = append(ints, a1...)
  406. ints = append(ints, a2...)
  407. assetEqual(t, ints, items)
  408. }
  409. func TestStream_Skip(t *testing.T) {
  410. assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
  411. assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
  412. assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
  413. equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
  414. assert.Panics(t, func() {
  415. Just(1, 2, 3, 4).Skip(-1)
  416. })
  417. }
  418. func TestStream_Concat(t *testing.T) {
  419. stream := Just(1).Concat(Just(2), Just(3))
  420. var items []interface{}
  421. for item := range stream.source {
  422. items = append(items, item)
  423. }
  424. sort.Slice(items, func(i, j int) bool {
  425. return items[i].(int) < items[j].(int)
  426. })
  427. assetEqual(t, []interface{}{1, 2, 3}, items)
  428. just := Just(1)
  429. equal(t, just.Concat(just), []interface{}{1})
  430. }