fn_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package fx
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/tal-tech/go-zero/core/stringx"
  12. )
  13. func TestBuffer(t *testing.T) {
  14. const N = 5
  15. var count int32
  16. var wait sync.WaitGroup
  17. wait.Add(1)
  18. From(func(source chan<- interface{}) {
  19. ticker := time.NewTicker(10 * time.Millisecond)
  20. defer ticker.Stop()
  21. for i := 0; i < 2*N; i++ {
  22. select {
  23. case source <- i:
  24. atomic.AddInt32(&count, 1)
  25. case <-ticker.C:
  26. wait.Done()
  27. return
  28. }
  29. }
  30. }).Buffer(N).ForAll(func(pipe <-chan interface{}) {
  31. wait.Wait()
  32. // why N+1, because take one more to wait for sending into the channel
  33. assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
  34. })
  35. }
  36. func TestBufferNegative(t *testing.T) {
  37. var result int
  38. Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  39. for item := range pipe {
  40. result += item.(int)
  41. }
  42. return result, nil
  43. })
  44. assert.Equal(t, 10, result)
  45. }
  46. func TestDone(t *testing.T) {
  47. var count int32
  48. Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
  49. time.Sleep(time.Millisecond * 100)
  50. atomic.AddInt32(&count, int32(item.(int)))
  51. }).Done()
  52. assert.Equal(t, int32(6), count)
  53. }
  54. func TestJust(t *testing.T) {
  55. var result int
  56. Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  57. for item := range pipe {
  58. result += item.(int)
  59. }
  60. return result, nil
  61. })
  62. assert.Equal(t, 10, result)
  63. }
  64. func TestDistinct(t *testing.T) {
  65. var result int
  66. Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
  67. return item
  68. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  69. for item := range pipe {
  70. result += item.(int)
  71. }
  72. return result, nil
  73. })
  74. assert.Equal(t, 10, result)
  75. }
  76. func TestFilter(t *testing.T) {
  77. var result int
  78. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  79. return item.(int)%2 == 0
  80. }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  81. for item := range pipe {
  82. result += item.(int)
  83. }
  84. return result, nil
  85. })
  86. assert.Equal(t, 6, result)
  87. }
  88. func TestForAll(t *testing.T) {
  89. var result int
  90. Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
  91. return item.(int)%2 == 0
  92. }).ForAll(func(pipe <-chan interface{}) {
  93. for item := range pipe {
  94. result += item.(int)
  95. }
  96. })
  97. assert.Equal(t, 6, result)
  98. }
  99. func TestGroup(t *testing.T) {
  100. var groups [][]int
  101. Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
  102. v := item.(int)
  103. return v / 10
  104. }).ForEach(func(item interface{}) {
  105. v := item.([]interface{})
  106. var group []int
  107. for _, each := range v {
  108. group = append(group, each.(int))
  109. }
  110. groups = append(groups, group)
  111. })
  112. assert.Equal(t, 2, len(groups))
  113. for _, group := range groups {
  114. assert.Equal(t, 2, len(group))
  115. assert.True(t, group[0]/10 == group[1]/10)
  116. }
  117. }
  118. func TestHead(t *testing.T) {
  119. var result int
  120. Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  121. for item := range pipe {
  122. result += item.(int)
  123. }
  124. return result, nil
  125. })
  126. assert.Equal(t, 3, result)
  127. }
  128. func TestHeadMore(t *testing.T) {
  129. var result int
  130. Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  131. for item := range pipe {
  132. result += item.(int)
  133. }
  134. return result, nil
  135. })
  136. assert.Equal(t, 10, result)
  137. }
  138. func TestMap(t *testing.T) {
  139. log.SetOutput(ioutil.Discard)
  140. tests := []struct {
  141. mapper MapFunc
  142. expect int
  143. }{
  144. {
  145. mapper: func(item interface{}) interface{} {
  146. v := item.(int)
  147. return v * v
  148. },
  149. expect: 30,
  150. },
  151. {
  152. mapper: func(item interface{}) interface{} {
  153. v := item.(int)
  154. if v%2 == 0 {
  155. return 0
  156. }
  157. return v * v
  158. },
  159. expect: 10,
  160. },
  161. {
  162. mapper: func(item interface{}) interface{} {
  163. v := item.(int)
  164. if v%2 == 0 {
  165. panic(v)
  166. }
  167. return v * v
  168. },
  169. expect: 10,
  170. },
  171. }
  172. // Map(...) works even WithWorkers(0)
  173. for i, test := range tests {
  174. t.Run(stringx.Rand(), func(t *testing.T) {
  175. var result int
  176. var workers int
  177. if i%2 == 0 {
  178. workers = 0
  179. } else {
  180. workers = runtime.NumCPU()
  181. }
  182. From(func(source chan<- interface{}) {
  183. for i := 1; i < 5; i++ {
  184. source <- i
  185. }
  186. }).Map(test.mapper, WithWorkers(workers)).Reduce(
  187. func(pipe <-chan interface{}) (interface{}, error) {
  188. for item := range pipe {
  189. result += item.(int)
  190. }
  191. return result, nil
  192. })
  193. assert.Equal(t, test.expect, result)
  194. })
  195. }
  196. }
  197. func TestMerge(t *testing.T) {
  198. Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
  199. assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
  200. })
  201. }
  202. func TestParallelJust(t *testing.T) {
  203. var count int32
  204. Just(1, 2, 3).Parallel(func(item interface{}) {
  205. time.Sleep(time.Millisecond * 100)
  206. atomic.AddInt32(&count, int32(item.(int)))
  207. }, UnlimitedWorkers())
  208. assert.Equal(t, int32(6), count)
  209. }
  210. func TestReverse(t *testing.T) {
  211. Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
  212. assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
  213. })
  214. }
  215. func TestSort(t *testing.T) {
  216. var prev int
  217. Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
  218. return a.(int) < b.(int)
  219. }).ForEach(func(item interface{}) {
  220. next := item.(int)
  221. assert.True(t, prev < next)
  222. prev = next
  223. })
  224. }
  225. func TestTail(t *testing.T) {
  226. var result int
  227. Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
  228. for item := range pipe {
  229. result += item.(int)
  230. }
  231. return result, nil
  232. })
  233. assert.Equal(t, 7, result)
  234. }
  235. func TestWalk(t *testing.T) {
  236. var result int
  237. Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
  238. if item.(int)%2 != 0 {
  239. pipe <- item
  240. }
  241. }, UnlimitedWorkers()).ForEach(func(item interface{}) {
  242. result += item.(int)
  243. })
  244. assert.Equal(t, 9, result)
  245. }
  246. func BenchmarkMapReduce(b *testing.B) {
  247. b.ReportAllocs()
  248. mapper := func(v interface{}) interface{} {
  249. return v.(int64) * v.(int64)
  250. }
  251. reducer := func(input <-chan interface{}) (interface{}, error) {
  252. var result int64
  253. for v := range input {
  254. result += v.(int64)
  255. }
  256. return result, nil
  257. }
  258. for i := 0; i < b.N; i++ {
  259. From(func(input chan<- interface{}) {
  260. for j := 0; j < 2; j++ {
  261. input <- int64(j)
  262. }
  263. }).Map(mapper).Reduce(reducer)
  264. }
  265. }