fn.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. FilterFunc func(item interface{}) bool
  19. ForAllFunc func(pipe <-chan interface{})
  20. ForEachFunc func(item interface{})
  21. GenerateFunc func(source chan<- interface{})
  22. KeyFunc func(item interface{}) interface{}
  23. LessFunc func(a, b interface{}) bool
  24. MapFunc func(item interface{}) interface{}
  25. Option func(opts *rxOptions)
  26. ParallelFunc func(item interface{})
  27. ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
  28. WalkFunc func(item interface{}, pipe chan<- interface{})
  29. Stream struct {
  30. source <-chan interface{}
  31. }
  32. )
  33. // From constructs a Stream from the given GenerateFunc.
  34. func From(generate GenerateFunc) Stream {
  35. source := make(chan interface{})
  36. threading.GoSafe(func() {
  37. defer close(source)
  38. generate(source)
  39. })
  40. return Range(source)
  41. }
  42. // Just converts the given arbitrary items to a Stream.
  43. func Just(items ...interface{}) Stream {
  44. source := make(chan interface{}, len(items))
  45. for _, item := range items {
  46. source <- item
  47. }
  48. close(source)
  49. return Range(source)
  50. }
  51. // Range converts the given channel to a Stream.
  52. func Range(source <-chan interface{}) Stream {
  53. return Stream{
  54. source: source,
  55. }
  56. }
  57. // Buffer buffers the items into a queue with size n.
  58. // It can balance the producer and the consumer if their processing throughput don't match.
  59. func (p Stream) Buffer(n int) Stream {
  60. if n < 0 {
  61. n = 0
  62. }
  63. source := make(chan interface{}, n)
  64. go func() {
  65. for item := range p.source {
  66. source <- item
  67. }
  68. close(source)
  69. }()
  70. return Range(source)
  71. }
  72. // Count counts the number of elements in the result.
  73. func (p Stream) Count() (count int) {
  74. for range p.source {
  75. count++
  76. }
  77. return
  78. }
  79. // Distinct removes the duplicated items base on the given KeyFunc.
  80. func (p Stream) Distinct(fn KeyFunc) Stream {
  81. source := make(chan interface{})
  82. threading.GoSafe(func() {
  83. defer close(source)
  84. keys := make(map[interface{}]lang.PlaceholderType)
  85. for item := range p.source {
  86. key := fn(item)
  87. if _, ok := keys[key]; !ok {
  88. source <- item
  89. keys[key] = lang.Placeholder
  90. }
  91. }
  92. })
  93. return Range(source)
  94. }
  95. // Done waits all upstreaming operations to be done.
  96. func (p Stream) Done() {
  97. for range p.source {
  98. }
  99. }
  100. // Filter filters the items by the given FilterFunc.
  101. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  102. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  103. if fn(item) {
  104. pipe <- item
  105. }
  106. }, opts...)
  107. }
  108. // ForAll handles the streaming elements from the source and no later streams.
  109. func (p Stream) ForAll(fn ForAllFunc) {
  110. fn(p.source)
  111. }
  112. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  113. func (p Stream) ForEach(fn ForEachFunc) {
  114. for item := range p.source {
  115. fn(item)
  116. }
  117. }
  118. // Group groups the elements into different groups based on their keys.
  119. func (p Stream) Group(fn KeyFunc) Stream {
  120. groups := make(map[interface{}][]interface{})
  121. for item := range p.source {
  122. key := fn(item)
  123. groups[key] = append(groups[key], item)
  124. }
  125. source := make(chan interface{})
  126. go func() {
  127. for _, group := range groups {
  128. source <- group
  129. }
  130. close(source)
  131. }()
  132. return Range(source)
  133. }
  134. func (p Stream) Head(n int64) Stream {
  135. if n < 1 {
  136. panic("n must be greater than 0")
  137. }
  138. source := make(chan interface{})
  139. go func() {
  140. for item := range p.source {
  141. n--
  142. if n >= 0 {
  143. source <- item
  144. }
  145. if n == 0 {
  146. // let successive method go ASAP even we have more items to skip
  147. // why we don't just break the loop, because if break,
  148. // this former goroutine will block forever, which will cause goroutine leak.
  149. close(source)
  150. }
  151. }
  152. if n > 0 {
  153. close(source)
  154. }
  155. }()
  156. return Range(source)
  157. }
  158. // Maps converts each item to another corresponding item, which means it's a 1:1 model.
  159. func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
  160. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  161. pipe <- fn(item)
  162. }, opts...)
  163. }
  164. // Merge merges all the items into a slice and generates a new stream.
  165. func (p Stream) Merge() Stream {
  166. var items []interface{}
  167. for item := range p.source {
  168. items = append(items, item)
  169. }
  170. source := make(chan interface{}, 1)
  171. source <- items
  172. close(source)
  173. return Range(source)
  174. }
  175. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  176. func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
  177. p.Walk(func(item interface{}, pipe chan<- interface{}) {
  178. fn(item)
  179. }, opts...).Done()
  180. }
  181. // Reduce is a utility method to let the caller deal with the underlying channel.
  182. func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
  183. return fn(p.source)
  184. }
  185. // Reverse reverses the elements in the stream.
  186. func (p Stream) Reverse() Stream {
  187. var items []interface{}
  188. for item := range p.source {
  189. items = append(items, item)
  190. }
  191. // reverse, official method
  192. for i := len(items)/2 - 1; i >= 0; i-- {
  193. opp := len(items) - 1 - i
  194. items[i], items[opp] = items[opp], items[i]
  195. }
  196. return Just(items...)
  197. }
  198. // Sort sorts the items from the underlying source.
  199. func (p Stream) Sort(less LessFunc) Stream {
  200. var items []interface{}
  201. for item := range p.source {
  202. items = append(items, item)
  203. }
  204. sort.Slice(items, func(i, j int) bool {
  205. return less(items[i], items[j])
  206. })
  207. return Just(items...)
  208. }
  209. // Split splits the elements into chunk with size up to n,
  210. // might be less than n on tailing elements.
  211. func (p Stream) Split(n int) Stream {
  212. if n < 1 {
  213. panic("n should be greater than 0")
  214. }
  215. source := make(chan interface{})
  216. go func() {
  217. var chunk []interface{}
  218. for item := range p.source {
  219. chunk = append(chunk, item)
  220. if len(chunk) == n {
  221. source <- chunk
  222. chunk = nil
  223. }
  224. }
  225. if chunk != nil {
  226. source <- chunk
  227. }
  228. close(source)
  229. }()
  230. return Range(source)
  231. }
  232. func (p Stream) Tail(n int64) Stream {
  233. if n < 1 {
  234. panic("n should be greater than 0")
  235. }
  236. source := make(chan interface{})
  237. go func() {
  238. ring := collection.NewRing(int(n))
  239. for item := range p.source {
  240. ring.Add(item)
  241. }
  242. for _, item := range ring.Take() {
  243. source <- item
  244. }
  245. close(source)
  246. }()
  247. return Range(source)
  248. }
  249. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  250. func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  251. option := buildOptions(opts...)
  252. if option.unlimitedWorkers {
  253. return p.walkUnlimited(fn, option)
  254. }
  255. return p.walkLimited(fn, option)
  256. }
  257. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  258. pipe := make(chan interface{}, option.workers)
  259. go func() {
  260. var wg sync.WaitGroup
  261. pool := make(chan lang.PlaceholderType, option.workers)
  262. for {
  263. pool <- lang.Placeholder
  264. item, ok := <-p.source
  265. if !ok {
  266. <-pool
  267. break
  268. }
  269. wg.Add(1)
  270. // better to safely run caller defined method
  271. threading.GoSafe(func() {
  272. defer func() {
  273. wg.Done()
  274. <-pool
  275. }()
  276. fn(item, pipe)
  277. })
  278. }
  279. wg.Wait()
  280. close(pipe)
  281. }()
  282. return Range(pipe)
  283. }
  284. func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  285. pipe := make(chan interface{}, defaultWorkers)
  286. go func() {
  287. var wg sync.WaitGroup
  288. for {
  289. item, ok := <-p.source
  290. if !ok {
  291. break
  292. }
  293. wg.Add(1)
  294. // better to safely run caller defined method
  295. threading.GoSafe(func() {
  296. defer wg.Done()
  297. fn(item, pipe)
  298. })
  299. }
  300. wg.Wait()
  301. close(pipe)
  302. }()
  303. return Range(pipe)
  304. }
  305. // UnlimitedWorkers lets the caller to use as many workers as the tasks.
  306. func UnlimitedWorkers() Option {
  307. return func(opts *rxOptions) {
  308. opts.unlimitedWorkers = true
  309. }
  310. }
  311. // WithWorkers lets the caller to customize the concurrent workers.
  312. func WithWorkers(workers int) Option {
  313. return func(opts *rxOptions) {
  314. if workers < minWorkers {
  315. opts.workers = minWorkers
  316. } else {
  317. opts.workers = workers
  318. }
  319. }
  320. }
  321. func buildOptions(opts ...Option) *rxOptions {
  322. options := newOptions()
  323. for _, opt := range opts {
  324. opt(options)
  325. }
  326. return options
  327. }
  328. func newOptions() *rxOptions {
  329. return &rxOptions{
  330. workers: defaultWorkers,
  331. }
  332. }