123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- package fx
- import (
- "sort"
- "sync"
- "github.com/tal-tech/go-zero/core/collection"
- "github.com/tal-tech/go-zero/core/lang"
- "github.com/tal-tech/go-zero/core/threading"
- )
- const (
- defaultWorkers = 16
- minWorkers = 1
- )
- type (
- rxOptions struct {
- unlimitedWorkers bool
- workers int
- }
- FilterFunc func(item interface{}) bool
- ForAllFunc func(pipe <-chan interface{})
- ForEachFunc func(item interface{})
- GenerateFunc func(source chan<- interface{})
- KeyFunc func(item interface{}) interface{}
- LessFunc func(a, b interface{}) bool
- MapFunc func(item interface{}) interface{}
- Option func(opts *rxOptions)
- ParallelFunc func(item interface{})
- ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
- WalkFunc func(item interface{}, pipe chan<- interface{})
- Stream struct {
- source <-chan interface{}
- }
- )
- // From constructs a Stream from the given GenerateFunc.
- func From(generate GenerateFunc) Stream {
- source := make(chan interface{})
- threading.GoSafe(func() {
- defer close(source)
- generate(source)
- })
- return Range(source)
- }
- // Just converts the given arbitrary items to a Stream.
- func Just(items ...interface{}) Stream {
- source := make(chan interface{}, len(items))
- for _, item := range items {
- source <- item
- }
- close(source)
- return Range(source)
- }
- // Range converts the given channel to a Stream.
- func Range(source <-chan interface{}) Stream {
- return Stream{
- source: source,
- }
- }
- // Buffer buffers the items into a queue with size n.
- // It can balance the producer and the consumer if their processing throughput don't match.
- func (p Stream) Buffer(n int) Stream {
- if n < 0 {
- n = 0
- }
- source := make(chan interface{}, n)
- go func() {
- for item := range p.source {
- source <- item
- }
- close(source)
- }()
- return Range(source)
- }
- // Count counts the number of elements in the result.
- func (p Stream) Count() (count int) {
- for range p.source {
- count++
- }
- return
- }
- // Distinct removes the duplicated items base on the given KeyFunc.
- func (p Stream) Distinct(fn KeyFunc) Stream {
- source := make(chan interface{})
- threading.GoSafe(func() {
- defer close(source)
- keys := make(map[interface{}]lang.PlaceholderType)
- for item := range p.source {
- key := fn(item)
- if _, ok := keys[key]; !ok {
- source <- item
- keys[key] = lang.Placeholder
- }
- }
- })
- return Range(source)
- }
- // Done waits all upstreaming operations to be done.
- func (p Stream) Done() {
- for range p.source {
- }
- }
- // Filter filters the items by the given FilterFunc.
- func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
- return p.Walk(func(item interface{}, pipe chan<- interface{}) {
- if fn(item) {
- pipe <- item
- }
- }, opts...)
- }
- // ForAll handles the streaming elements from the source and no later streams.
- func (p Stream) ForAll(fn ForAllFunc) {
- fn(p.source)
- }
- // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
- func (p Stream) ForEach(fn ForEachFunc) {
- for item := range p.source {
- fn(item)
- }
- }
- // Group groups the elements into different groups based on their keys.
- func (p Stream) Group(fn KeyFunc) Stream {
- groups := make(map[interface{}][]interface{})
- for item := range p.source {
- key := fn(item)
- groups[key] = append(groups[key], item)
- }
- source := make(chan interface{})
- go func() {
- for _, group := range groups {
- source <- group
- }
- close(source)
- }()
- return Range(source)
- }
- func (p Stream) Head(n int64) Stream {
- if n < 1 {
- panic("n must be greater than 0")
- }
- source := make(chan interface{})
- go func() {
- for item := range p.source {
- n--
- if n >= 0 {
- source <- item
- }
- if n == 0 {
- // let successive method go ASAP even we have more items to skip
- // why we don't just break the loop, because if break,
- // this former goroutine will block forever, which will cause goroutine leak.
- close(source)
- }
- }
- if n > 0 {
- close(source)
- }
- }()
- return Range(source)
- }
- // Maps converts each item to another corresponding item, which means it's a 1:1 model.
- func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
- return p.Walk(func(item interface{}, pipe chan<- interface{}) {
- pipe <- fn(item)
- }, opts...)
- }
- // Merge merges all the items into a slice and generates a new stream.
- func (p Stream) Merge() Stream {
- var items []interface{}
- for item := range p.source {
- items = append(items, item)
- }
- source := make(chan interface{}, 1)
- source <- items
- close(source)
- return Range(source)
- }
- // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
- func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
- p.Walk(func(item interface{}, pipe chan<- interface{}) {
- fn(item)
- }, opts...).Done()
- }
- // Reduce is a utility method to let the caller deal with the underlying channel.
- func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
- return fn(p.source)
- }
- // Reverse reverses the elements in the stream.
- func (p Stream) Reverse() Stream {
- var items []interface{}
- for item := range p.source {
- items = append(items, item)
- }
- // reverse, official method
- for i := len(items)/2 - 1; i >= 0; i-- {
- opp := len(items) - 1 - i
- items[i], items[opp] = items[opp], items[i]
- }
- return Just(items...)
- }
- // Sort sorts the items from the underlying source.
- func (p Stream) Sort(less LessFunc) Stream {
- var items []interface{}
- for item := range p.source {
- items = append(items, item)
- }
- sort.Slice(items, func(i, j int) bool {
- return less(items[i], items[j])
- })
- return Just(items...)
- }
- // Split splits the elements into chunk with size up to n,
- // might be less than n on tailing elements.
- func (p Stream) Split(n int) Stream {
- if n < 1 {
- panic("n should be greater than 0")
- }
- source := make(chan interface{})
- go func() {
- var chunk []interface{}
- for item := range p.source {
- chunk = append(chunk, item)
- if len(chunk) == n {
- source <- chunk
- chunk = nil
- }
- }
- if chunk != nil {
- source <- chunk
- }
- close(source)
- }()
- return Range(source)
- }
- func (p Stream) Tail(n int64) Stream {
- if n < 1 {
- panic("n should be greater than 0")
- }
- source := make(chan interface{})
- go func() {
- ring := collection.NewRing(int(n))
- for item := range p.source {
- ring.Add(item)
- }
- for _, item := range ring.Take() {
- source <- item
- }
- close(source)
- }()
- return Range(source)
- }
- // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
- func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
- option := buildOptions(opts...)
- if option.unlimitedWorkers {
- return p.walkUnlimited(fn, option)
- }
- return p.walkLimited(fn, option)
- }
- func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
- pipe := make(chan interface{}, option.workers)
- go func() {
- var wg sync.WaitGroup
- pool := make(chan lang.PlaceholderType, option.workers)
- for {
- pool <- lang.Placeholder
- item, ok := <-p.source
- if !ok {
- <-pool
- break
- }
- wg.Add(1)
- // better to safely run caller defined method
- threading.GoSafe(func() {
- defer func() {
- wg.Done()
- <-pool
- }()
- fn(item, pipe)
- })
- }
- wg.Wait()
- close(pipe)
- }()
- return Range(pipe)
- }
- func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
- pipe := make(chan interface{}, defaultWorkers)
- go func() {
- var wg sync.WaitGroup
- for {
- item, ok := <-p.source
- if !ok {
- break
- }
- wg.Add(1)
- // better to safely run caller defined method
- threading.GoSafe(func() {
- defer wg.Done()
- fn(item, pipe)
- })
- }
- wg.Wait()
- close(pipe)
- }()
- return Range(pipe)
- }
- // UnlimitedWorkers lets the caller to use as many workers as the tasks.
- func UnlimitedWorkers() Option {
- return func(opts *rxOptions) {
- opts.unlimitedWorkers = true
- }
- }
- // WithWorkers lets the caller to customize the concurrent workers.
- func WithWorkers(workers int) Option {
- return func(opts *rxOptions) {
- if workers < minWorkers {
- opts.workers = minWorkers
- } else {
- opts.workers = workers
- }
- }
- }
- func buildOptions(opts ...Option) *rxOptions {
- options := newOptions()
- for _, opt := range opts {
- opt(options)
- }
- return options
- }
- func newOptions() *rxOptions {
- return &rxOptions{
- workers: defaultWorkers,
- }
- }
|