123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- package fx
- import (
- "io/ioutil"
- "log"
- "runtime"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/tal-tech/go-zero/core/stringx"
- )
- func TestBuffer(t *testing.T) {
- const N = 5
- var count int32
- var wait sync.WaitGroup
- wait.Add(1)
- From(func(source chan<- interface{}) {
- ticker := time.NewTicker(10 * time.Millisecond)
- defer ticker.Stop()
- for i := 0; i < 2*N; i++ {
- select {
- case source <- i:
- atomic.AddInt32(&count, 1)
- case <-ticker.C:
- wait.Done()
- return
- }
- }
- }).Buffer(N).ForAll(func(pipe <-chan interface{}) {
- wait.Wait()
- // why N+1, because take one more to wait for sending into the channel
- assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
- })
- }
- func TestBufferNegative(t *testing.T) {
- var result int
- Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, 10, result)
- }
- func TestCount(t *testing.T) {
- tests := []struct {
- name string
- elements []interface{}
- }{
- {
- name: "no elements with nil",
- },
- {
- name: "no elements",
- elements: []interface{}{},
- },
- {
- name: "1 element",
- elements: []interface{}{1},
- },
- {
- name: "multiple elements",
- elements: []interface{}{1, 2, 3},
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- val := Just(test.elements...).Count()
- assert.Equal(t, len(test.elements), val)
- })
- }
- }
- func TestDone(t *testing.T) {
- var count int32
- Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
- time.Sleep(time.Millisecond * 100)
- atomic.AddInt32(&count, int32(item.(int)))
- }).Done()
- assert.Equal(t, int32(6), count)
- }
- func TestJust(t *testing.T) {
- var result int
- Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, 10, result)
- }
- func TestDistinct(t *testing.T) {
- var result int
- Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
- return item
- }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, 10, result)
- }
- func TestFilter(t *testing.T) {
- var result int
- Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
- return item.(int)%2 == 0
- }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, 6, result)
- }
- func TestForAll(t *testing.T) {
- var result int
- Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
- return item.(int)%2 == 0
- }).ForAll(func(pipe <-chan interface{}) {
- for item := range pipe {
- result += item.(int)
- }
- })
- assert.Equal(t, 6, result)
- }
- func TestGroup(t *testing.T) {
- var groups [][]int
- Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
- v := item.(int)
- return v / 10
- }).ForEach(func(item interface{}) {
- v := item.([]interface{})
- var group []int
- for _, each := range v {
- group = append(group, each.(int))
- }
- groups = append(groups, group)
- })
- assert.Equal(t, 2, len(groups))
- for _, group := range groups {
- assert.Equal(t, 2, len(group))
- assert.True(t, group[0]/10 == group[1]/10)
- }
- }
- func TestHead(t *testing.T) {
- var result int
- Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, 3, result)
- }
- func TestHeadZero(t *testing.T) {
- assert.Panics(t, func() {
- Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- return nil, nil
- })
- })
- }
- func TestHeadMore(t *testing.T) {
- var result int
- Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, 10, result)
- }
- func TestMap(t *testing.T) {
- log.SetOutput(ioutil.Discard)
- tests := []struct {
- mapper MapFunc
- expect int
- }{
- {
- mapper: func(item interface{}) interface{} {
- v := item.(int)
- return v * v
- },
- expect: 30,
- },
- {
- mapper: func(item interface{}) interface{} {
- v := item.(int)
- if v%2 == 0 {
- return 0
- }
- return v * v
- },
- expect: 10,
- },
- {
- mapper: func(item interface{}) interface{} {
- v := item.(int)
- if v%2 == 0 {
- panic(v)
- }
- return v * v
- },
- expect: 10,
- },
- }
- // Map(...) works even WithWorkers(0)
- for i, test := range tests {
- t.Run(stringx.Rand(), func(t *testing.T) {
- var result int
- var workers int
- if i%2 == 0 {
- workers = 0
- } else {
- workers = runtime.NumCPU()
- }
- From(func(source chan<- interface{}) {
- for i := 1; i < 5; i++ {
- source <- i
- }
- }).Map(test.mapper, WithWorkers(workers)).Reduce(
- func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, test.expect, result)
- })
- }
- }
- func TestMerge(t *testing.T) {
- Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
- assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
- })
- }
- func TestParallelJust(t *testing.T) {
- var count int32
- Just(1, 2, 3).Parallel(func(item interface{}) {
- time.Sleep(time.Millisecond * 100)
- atomic.AddInt32(&count, int32(item.(int)))
- }, UnlimitedWorkers())
- assert.Equal(t, int32(6), count)
- }
- func TestReverse(t *testing.T) {
- Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
- assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
- })
- }
- func TestSort(t *testing.T) {
- var prev int
- Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
- return a.(int) < b.(int)
- }).ForEach(func(item interface{}) {
- next := item.(int)
- assert.True(t, prev < next)
- prev = next
- })
- }
- func TestSplit(t *testing.T) {
- assert.Panics(t, func() {
- Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
- })
- var chunks [][]interface{}
- Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
- chunk := item.([]interface{})
- chunks = append(chunks, chunk)
- })
- assert.EqualValues(t, [][]interface{}{
- {1, 2, 3, 4},
- {5, 6, 7, 8},
- {9, 10},
- }, chunks)
- }
- func TestTail(t *testing.T) {
- var result int
- Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- for item := range pipe {
- result += item.(int)
- }
- return result, nil
- })
- assert.Equal(t, 7, result)
- }
- func TestTailZero(t *testing.T) {
- assert.Panics(t, func() {
- Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
- return nil, nil
- })
- })
- }
- func TestWalk(t *testing.T) {
- var result int
- Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
- if item.(int)%2 != 0 {
- pipe <- item
- }
- }, UnlimitedWorkers()).ForEach(func(item interface{}) {
- result += item.(int)
- })
- assert.Equal(t, 9, result)
- }
- func BenchmarkMapReduce(b *testing.B) {
- b.ReportAllocs()
- mapper := func(v interface{}) interface{} {
- return v.(int64) * v.(int64)
- }
- reducer := func(input <-chan interface{}) (interface{}, error) {
- var result int64
- for v := range input {
- result += v.(int64)
- }
- return result, nil
- }
- for i := 0; i < b.N; i++ {
- From(func(input chan<- interface{}) {
- for j := 0; j < 2; j++ {
- input <- int64(j)
- }
- }).Map(mapper).Reduce(reducer)
- }
- }
|