fn.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. FilterFunc func(item interface{}) bool
  19. ForAllFunc func(pipe <-chan interface{})
  20. ForEachFunc func(item interface{})
  21. GenerateFunc func(source chan<- interface{})
  22. KeyFunc func(item interface{}) interface{}
  23. LessFunc func(a, b interface{}) bool
  24. MapFunc func(item interface{}) interface{}
  25. Option func(opts *rxOptions)
  26. ParallelFunc func(item interface{})
  27. ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
  28. WalkFunc func(item interface{}, pipe chan<- interface{})
  29. Stream struct {
  30. source <-chan interface{}
  31. }
  32. )
  33. // From constructs a Stream from the given GenerateFunc.
  34. func From(generate GenerateFunc) Stream {
  35. source := make(chan interface{})
  36. threading.GoSafe(func() {
  37. defer close(source)
  38. generate(source)
  39. })
  40. return Range(source)
  41. }
  42. // Just converts the given arbitrary items to a Stream.
  43. func Just(items ...interface{}) Stream {
  44. source := make(chan interface{}, len(items))
  45. for _, item := range items {
  46. source <- item
  47. }
  48. close(source)
  49. return Range(source)
  50. }
  51. // Range converts the given channel to a Stream.
  52. func Range(source <-chan interface{}) Stream {
  53. return Stream{
  54. source: source,
  55. }
  56. }
  57. // Buffer buffers the items into a queue with size n.
  58. func (p Stream) Buffer(n int) Stream {
  59. if n < 0 {
  60. n = 0
  61. }
  62. source := make(chan interface{}, n)
  63. go func() {
  64. for item := range p.source {
  65. source <- item
  66. }
  67. close(source)
  68. }()
  69. return Range(source)
  70. }
  71. // Distinct removes the duplicated items base on the given KeyFunc.
  72. func (p Stream) Distinct(fn KeyFunc) Stream {
  73. source := make(chan interface{})
  74. threading.GoSafe(func() {
  75. defer close(source)
  76. keys := make(map[interface{}]lang.PlaceholderType)
  77. for item := range p.source {
  78. key := fn(item)
  79. if _, ok := keys[key]; !ok {
  80. source <- item
  81. keys[key] = lang.Placeholder
  82. }
  83. }
  84. })
  85. return Range(source)
  86. }
  87. // Done waits all upstreaming operations to be done.
  88. func (p Stream) Done() {
  89. for range p.source {
  90. }
  91. }
  92. // Filter filters the items by the given FilterFunc.
  93. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  94. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  95. if fn(item) {
  96. pipe <- item
  97. }
  98. }, opts...)
  99. }
  100. // ForAll handles the streaming elements from the source and no later streams.
  101. func (p Stream) ForAll(fn ForAllFunc) {
  102. fn(p.source)
  103. }
  104. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  105. func (p Stream) ForEach(fn ForEachFunc) {
  106. for item := range p.source {
  107. fn(item)
  108. }
  109. }
  110. // Group groups the elements into different groups based on their keys.
  111. func (p Stream) Group(fn KeyFunc) Stream {
  112. groups := make(map[interface{}][]interface{})
  113. for item := range p.source {
  114. key := fn(item)
  115. groups[key] = append(groups[key], item)
  116. }
  117. source := make(chan interface{})
  118. go func() {
  119. for _, group := range groups {
  120. source <- group
  121. }
  122. close(source)
  123. }()
  124. return Range(source)
  125. }
  126. func (p Stream) Head(n int64) Stream {
  127. source := make(chan interface{})
  128. go func() {
  129. for item := range p.source {
  130. n--
  131. if n >= 0 {
  132. source <- item
  133. }
  134. if n == 0 {
  135. // let successive method go ASAP even we have more items to skip
  136. // why we don't just break the loop, because if break,
  137. // this former goroutine will block forever, which will cause goroutine leak.
  138. close(source)
  139. }
  140. }
  141. if n > 0 {
  142. close(source)
  143. }
  144. }()
  145. return Range(source)
  146. }
  147. // Maps converts each item to another corresponding item, which means it's a 1:1 model.
  148. func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
  149. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  150. pipe <- fn(item)
  151. }, opts...)
  152. }
  153. // Merge merges all the items into a slice and generates a new stream.
  154. func (p Stream) Merge() Stream {
  155. var items []interface{}
  156. for item := range p.source {
  157. items = append(items, item)
  158. }
  159. source := make(chan interface{}, 1)
  160. source <- items
  161. close(source)
  162. return Range(source)
  163. }
  164. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  165. func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
  166. p.Walk(func(item interface{}, pipe chan<- interface{}) {
  167. fn(item)
  168. }, opts...).Done()
  169. }
  170. // Reduce is a utility method to let the caller deal with the underlying channel.
  171. func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
  172. return fn(p.source)
  173. }
  174. // Reverse reverses the elements in the stream.
  175. func (p Stream) Reverse() Stream {
  176. var items []interface{}
  177. for item := range p.source {
  178. items = append(items, item)
  179. }
  180. // reverse, official method
  181. for i := len(items)/2 - 1; i >= 0; i-- {
  182. opp := len(items) - 1 - i
  183. items[i], items[opp] = items[opp], items[i]
  184. }
  185. return Just(items...)
  186. }
  187. // Sort sorts the items from the underlying source.
  188. func (p Stream) Sort(less LessFunc) Stream {
  189. var items []interface{}
  190. for item := range p.source {
  191. items = append(items, item)
  192. }
  193. sort.Slice(items, func(i, j int) bool {
  194. return less(items[i], items[j])
  195. })
  196. return Just(items...)
  197. }
  198. func (p Stream) Tail(n int64) Stream {
  199. source := make(chan interface{})
  200. go func() {
  201. ring := collection.NewRing(int(n))
  202. for item := range p.source {
  203. ring.Add(item)
  204. }
  205. for _, item := range ring.Take() {
  206. source <- item
  207. }
  208. close(source)
  209. }()
  210. return Range(source)
  211. }
  212. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  213. func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  214. option := buildOptions(opts...)
  215. if option.unlimitedWorkers {
  216. return p.walkUnlimited(fn, option)
  217. } else {
  218. return p.walkLimited(fn, option)
  219. }
  220. }
  221. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  222. pipe := make(chan interface{}, option.workers)
  223. go func() {
  224. var wg sync.WaitGroup
  225. pool := make(chan lang.PlaceholderType, option.workers)
  226. for {
  227. pool <- lang.Placeholder
  228. item, ok := <-p.source
  229. if !ok {
  230. <-pool
  231. break
  232. }
  233. wg.Add(1)
  234. // better to safely run caller defined method
  235. threading.GoSafe(func() {
  236. defer func() {
  237. wg.Done()
  238. <-pool
  239. }()
  240. fn(item, pipe)
  241. })
  242. }
  243. wg.Wait()
  244. close(pipe)
  245. }()
  246. return Range(pipe)
  247. }
  248. func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  249. pipe := make(chan interface{}, defaultWorkers)
  250. go func() {
  251. var wg sync.WaitGroup
  252. for {
  253. item, ok := <-p.source
  254. if !ok {
  255. break
  256. }
  257. wg.Add(1)
  258. // better to safely run caller defined method
  259. threading.GoSafe(func() {
  260. defer wg.Done()
  261. fn(item, pipe)
  262. })
  263. }
  264. wg.Wait()
  265. close(pipe)
  266. }()
  267. return Range(pipe)
  268. }
  269. // UnlimitedWorkers lets the caller to use as many workers as the tasks.
  270. func UnlimitedWorkers() Option {
  271. return func(opts *rxOptions) {
  272. opts.unlimitedWorkers = true
  273. }
  274. }
  275. // WithWorkers lets the caller to customize the concurrent workers.
  276. func WithWorkers(workers int) Option {
  277. return func(opts *rxOptions) {
  278. if workers < minWorkers {
  279. opts.workers = minWorkers
  280. } else {
  281. opts.workers = workers
  282. }
  283. }
  284. }
  285. func buildOptions(opts ...Option) *rxOptions {
  286. options := newOptions()
  287. for _, opt := range opts {
  288. opt(options)
  289. }
  290. return options
  291. }
  292. func newOptions() *rxOptions {
  293. return &rxOptions{
  294. workers: defaultWorkers,
  295. }
  296. }