stream_test.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package fx
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/tal-tech/go-zero/core/stringx"
  12. )
  13. func TestBuffer(t *testing.T) {
  14. const N = 5
  15. var count int32
  16. var wait sync.WaitGroup
  17. wait.Add(1)
  18. From(func(source chan<- interface{}) {
  19. ticker := time.NewTicker(10 * time.Millisecond)
  20. defer ticker.Stop()
  21. for i := 0; i < 2*N; i++ {
  22. select {
  23. case source <- i:
  24. atomic.AddInt32(&count, 1)
  25. case <-ticker.C:
  26. wait.Done()
  27. return
  28. }
  29. }
  30. }).Buffer(N).ForAll(func(pipe <-chan interface{}) {
  31. wait.Wait()
  32. // why N+1, because take one more to wait for sending into the channel
  33. assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
  34. })
  35. }
  36. func TestBufferNegative(t *testing.T) {
  37. var result int
  38. Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  39. for item := range pipe {
  40. result += item.(int)
  41. }
  42. return result, nil
  43. })
  44. assert.Equal(t, 10, result)
  45. }
  46. func TestCount(t *testing.T) {
  47. tests := []struct {
  48. name string
  49. elements []interface{}
  50. }{
  51. {
  52. name: "no elements with nil",
  53. },
  54. {
  55. name: "no elements",
  56. elements: []interface{}{},
  57. },
  58. {
  59. name: "1 element",
  60. elements: []interface{}{1},
  61. },
  62. {
  63. name: "multiple elements",
  64. elements: []interface{}{1, 2, 3},
  65. },
  66. }
  67. for _, test := range tests {
  68. t.Run(test.name, func(t *testing.T) {
  69. val := Just(test.elements...).Count()
  70. assert.Equal(t, len(test.elements), val)
  71. })
  72. }
  73. }
  74. func TestDone(t *testing.T) {
  75. var count int32
  76. Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
  77. time.Sleep(time.Millisecond * 100)
  78. atomic.AddInt32(&count, int32(item.(int)))
  79. }).Done()
  80. assert.Equal(t, int32(6), count)
  81. }
  82. func TestJust(t *testing.T) {
  83. var result int
  84. Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  85. for item := range pipe {
  86. result += item.(int)
  87. }
  88. return result, nil
  89. })
  90. assert.Equal(t, 10, result)
  91. }
  92. func TestDistinct(t *testing.T) {
  93. var result int
  94. Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
  95. return item
  96. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  97. for item := range pipe {
  98. result += item.(int)
  99. }
  100. return result, nil
  101. })
  102. assert.Equal(t, 10, result)
  103. }
  104. func TestFilter(t *testing.T) {
  105. var result int
  106. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  107. return item.(int)%2 == 0
  108. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  109. for item := range pipe {
  110. result += item.(int)
  111. }
  112. return result, nil
  113. })
  114. assert.Equal(t, 6, result)
  115. }
  116. func TestForAll(t *testing.T) {
  117. var result int
  118. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  119. return item.(int)%2 == 0
  120. }).ForAll(func(pipe <-chan interface{}) {
  121. for item := range pipe {
  122. result += item.(int)
  123. }
  124. })
  125. assert.Equal(t, 6, result)
  126. }
  127. func TestGroup(t *testing.T) {
  128. var groups [][]int
  129. Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
  130. v := item.(int)
  131. return v / 10
  132. }).ForEach(func(item interface{}) {
  133. v := item.([]interface{})
  134. var group []int
  135. for _, each := range v {
  136. group = append(group, each.(int))
  137. }
  138. groups = append(groups, group)
  139. })
  140. assert.Equal(t, 2, len(groups))
  141. for _, group := range groups {
  142. assert.Equal(t, 2, len(group))
  143. assert.True(t, group[0]/10 == group[1]/10)
  144. }
  145. }
  146. func TestHead(t *testing.T) {
  147. var result int
  148. Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  149. for item := range pipe {
  150. result += item.(int)
  151. }
  152. return result, nil
  153. })
  154. assert.Equal(t, 3, result)
  155. }
  156. func TestHeadZero(t *testing.T) {
  157. assert.Panics(t, func() {
  158. Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  159. return nil, nil
  160. })
  161. })
  162. }
  163. func TestHeadMore(t *testing.T) {
  164. var result int
  165. Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  166. for item := range pipe {
  167. result += item.(int)
  168. }
  169. return result, nil
  170. })
  171. assert.Equal(t, 10, result)
  172. }
  173. func TestMap(t *testing.T) {
  174. log.SetOutput(ioutil.Discard)
  175. tests := []struct {
  176. mapper MapFunc
  177. expect int
  178. }{
  179. {
  180. mapper: func(item interface{}) interface{} {
  181. v := item.(int)
  182. return v * v
  183. },
  184. expect: 30,
  185. },
  186. {
  187. mapper: func(item interface{}) interface{} {
  188. v := item.(int)
  189. if v%2 == 0 {
  190. return 0
  191. }
  192. return v * v
  193. },
  194. expect: 10,
  195. },
  196. {
  197. mapper: func(item interface{}) interface{} {
  198. v := item.(int)
  199. if v%2 == 0 {
  200. panic(v)
  201. }
  202. return v * v
  203. },
  204. expect: 10,
  205. },
  206. }
  207. // Map(...) works even WithWorkers(0)
  208. for i, test := range tests {
  209. t.Run(stringx.Rand(), func(t *testing.T) {
  210. var result int
  211. var workers int
  212. if i%2 == 0 {
  213. workers = 0
  214. } else {
  215. workers = runtime.NumCPU()
  216. }
  217. From(func(source chan<- interface{}) {
  218. for i := 1; i < 5; i++ {
  219. source <- i
  220. }
  221. }).Map(test.mapper, WithWorkers(workers)).Reduce(
  222. func(pipe <-chan interface{}) (interface{}, error) {
  223. for item := range pipe {
  224. result += item.(int)
  225. }
  226. return result, nil
  227. })
  228. assert.Equal(t, test.expect, result)
  229. })
  230. }
  231. }
  232. func TestMerge(t *testing.T) {
  233. Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
  234. assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
  235. })
  236. }
  237. func TestParallelJust(t *testing.T) {
  238. var count int32
  239. Just(1, 2, 3).Parallel(func(item interface{}) {
  240. time.Sleep(time.Millisecond * 100)
  241. atomic.AddInt32(&count, int32(item.(int)))
  242. }, UnlimitedWorkers())
  243. assert.Equal(t, int32(6), count)
  244. }
  245. func TestReverse(t *testing.T) {
  246. Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
  247. assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
  248. })
  249. }
  250. func TestSort(t *testing.T) {
  251. var prev int
  252. Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
  253. return a.(int) < b.(int)
  254. }).ForEach(func(item interface{}) {
  255. next := item.(int)
  256. assert.True(t, prev < next)
  257. prev = next
  258. })
  259. }
  260. func TestSplit(t *testing.T) {
  261. assert.Panics(t, func() {
  262. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
  263. })
  264. var chunks [][]interface{}
  265. Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
  266. chunk := item.([]interface{})
  267. chunks = append(chunks, chunk)
  268. })
  269. assert.EqualValues(t, [][]interface{}{
  270. {1, 2, 3, 4},
  271. {5, 6, 7, 8},
  272. {9, 10},
  273. }, chunks)
  274. }
  275. func TestTail(t *testing.T) {
  276. var result int
  277. Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  278. for item := range pipe {
  279. result += item.(int)
  280. }
  281. return result, nil
  282. })
  283. assert.Equal(t, 7, result)
  284. }
  285. func TestTailZero(t *testing.T) {
  286. assert.Panics(t, func() {
  287. Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  288. return nil, nil
  289. })
  290. })
  291. }
  292. func TestWalk(t *testing.T) {
  293. var result int
  294. Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
  295. if item.(int)%2 != 0 {
  296. pipe <- item
  297. }
  298. }, UnlimitedWorkers()).ForEach(func(item interface{}) {
  299. result += item.(int)
  300. })
  301. assert.Equal(t, 9, result)
  302. }
  303. func BenchmarkMapReduce(b *testing.B) {
  304. b.ReportAllocs()
  305. mapper := func(v interface{}) interface{} {
  306. return v.(int64) * v.(int64)
  307. }
  308. reducer := func(input <-chan interface{}) (interface{}, error) {
  309. var result int64
  310. for v := range input {
  311. result += v.(int64)
  312. }
  313. return result, nil
  314. }
  315. for i := 0; i < b.N; i++ {
  316. From(func(input chan<- interface{}) {
  317. for j := 0; j < 2; j++ {
  318. input <- int64(j)
  319. }
  320. }).Map(mapper).Reduce(reducer)
  321. }
  322. }