stream.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. // FilterFunc defines the method to filter a Stream.
  19. FilterFunc func(item interface{}) bool
  20. // ForAllFunc defines the method to handle all elements in a Stream.
  21. ForAllFunc func(pipe <-chan interface{})
  22. // ForEachFunc defines the method to handle each element in a Stream.
  23. ForEachFunc func(item interface{})
  24. // GenerateFunc defines the method to send elements into a Stream.
  25. GenerateFunc func(source chan<- interface{})
  26. // KeyFunc defines the method to generate keys for the elements in a Stream.
  27. KeyFunc func(item interface{}) interface{}
  28. // LessFunc defines the method to compare the elements in a Stream.
  29. LessFunc func(a, b interface{}) bool
  30. // MapFunc defines the method to map each element to another object in a Stream.
  31. MapFunc func(item interface{}) interface{}
  32. // Option defines the method to customize a Stream.
  33. Option func(opts *rxOptions)
  34. // ParallelFunc defines the method to handle elements parallelly.
  35. ParallelFunc func(item interface{})
  36. // ReduceFunc defines the method to reduce all the elements in a Stream.
  37. ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
  38. // WalkFunc defines the method to walk through all the elements in a Stream.
  39. WalkFunc func(item interface{}, pipe chan<- interface{})
  40. // A Stream is a stream that can be used to do stream processing.
  41. Stream struct {
  42. source <-chan interface{}
  43. }
  44. )
  45. // empty a empty Stream.
  46. var empty Stream
  47. func init() {
  48. // initial empty Stream.
  49. source := make(chan interface{})
  50. close(source)
  51. empty.source = source
  52. }
  53. // Empty Returns a empty stream.
  54. func Empty() Stream {
  55. return empty
  56. }
  57. // From constructs a Stream from the given GenerateFunc.
  58. func From(generate GenerateFunc) Stream {
  59. source := make(chan interface{})
  60. threading.GoSafe(func() {
  61. defer close(source)
  62. generate(source)
  63. })
  64. return Range(source)
  65. }
  66. // Just converts the given arbitrary items to a Stream.
  67. func Just(items ...interface{}) Stream {
  68. source := make(chan interface{}, len(items))
  69. for _, item := range items {
  70. source <- item
  71. }
  72. close(source)
  73. return Range(source)
  74. }
  75. // Range converts the given channel to a Stream.
  76. func Range(source <-chan interface{}) Stream {
  77. return Stream{
  78. source: source,
  79. }
  80. }
  81. // Concat Returns a concat Stream.
  82. func Concat(a Stream, others ...Stream) Stream {
  83. return a.Concat(others...)
  84. }
  85. // Buffer buffers the items into a queue with size n.
  86. // It can balance the producer and the consumer if their processing throughput don't match.
  87. func (p Stream) Buffer(n int) Stream {
  88. if n < 0 {
  89. n = 0
  90. }
  91. source := make(chan interface{}, n)
  92. go func() {
  93. for item := range p.source {
  94. source <- item
  95. }
  96. close(source)
  97. }()
  98. return Range(source)
  99. }
  100. // Count counts the number of elements in the result.
  101. func (p Stream) Count() (count int) {
  102. for range p.source {
  103. count++
  104. }
  105. return
  106. }
  107. // Distinct removes the duplicated items base on the given KeyFunc.
  108. func (p Stream) Distinct(fn KeyFunc) Stream {
  109. source := make(chan interface{})
  110. threading.GoSafe(func() {
  111. defer close(source)
  112. keys := make(map[interface{}]lang.PlaceholderType)
  113. for item := range p.source {
  114. key := fn(item)
  115. if _, ok := keys[key]; !ok {
  116. source <- item
  117. keys[key] = lang.Placeholder
  118. }
  119. }
  120. })
  121. return Range(source)
  122. }
  123. // Done waits all upstreaming operations to be done.
  124. func (p Stream) Done() {
  125. for range p.source {
  126. }
  127. }
  128. // Filter filters the items by the given FilterFunc.
  129. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  130. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  131. if fn(item) {
  132. pipe <- item
  133. }
  134. }, opts...)
  135. }
  136. // ForAll handles the streaming elements from the source and no later streams.
  137. func (p Stream) ForAll(fn ForAllFunc) {
  138. fn(p.source)
  139. }
  140. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  141. func (p Stream) ForEach(fn ForEachFunc) {
  142. for item := range p.source {
  143. fn(item)
  144. }
  145. }
  146. // Group groups the elements into different groups based on their keys.
  147. func (p Stream) Group(fn KeyFunc) Stream {
  148. groups := make(map[interface{}][]interface{})
  149. for item := range p.source {
  150. key := fn(item)
  151. groups[key] = append(groups[key], item)
  152. }
  153. source := make(chan interface{})
  154. go func() {
  155. for _, group := range groups {
  156. source <- group
  157. }
  158. close(source)
  159. }()
  160. return Range(source)
  161. }
  162. // Head returns the first n elements in p.
  163. func (p Stream) Head(n int64) Stream {
  164. if n < 1 {
  165. panic("n must be greater than 0")
  166. }
  167. source := make(chan interface{})
  168. go func() {
  169. for item := range p.source {
  170. n--
  171. if n >= 0 {
  172. source <- item
  173. }
  174. if n == 0 {
  175. // let successive method go ASAP even we have more items to skip
  176. // why we don't just break the loop, because if break,
  177. // this former goroutine will block forever, which will cause goroutine leak.
  178. close(source)
  179. }
  180. }
  181. if n > 0 {
  182. close(source)
  183. }
  184. }()
  185. return Range(source)
  186. }
  187. // Map converts each item to another corresponding item, which means it's a 1:1 model.
  188. func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
  189. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  190. pipe <- fn(item)
  191. }, opts...)
  192. }
  193. // Merge merges all the items into a slice and generates a new stream.
  194. func (p Stream) Merge() Stream {
  195. var items []interface{}
  196. for item := range p.source {
  197. items = append(items, item)
  198. }
  199. source := make(chan interface{}, 1)
  200. source <- items
  201. close(source)
  202. return Range(source)
  203. }
  204. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  205. func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
  206. p.Walk(func(item interface{}, pipe chan<- interface{}) {
  207. fn(item)
  208. }, opts...).Done()
  209. }
  210. // Reduce is a utility method to let the caller deal with the underlying channel.
  211. func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
  212. return fn(p.source)
  213. }
  214. // Reverse reverses the elements in the stream.
  215. func (p Stream) Reverse() Stream {
  216. var items []interface{}
  217. for item := range p.source {
  218. items = append(items, item)
  219. }
  220. // reverse, official method
  221. for i := len(items)/2 - 1; i >= 0; i-- {
  222. opp := len(items) - 1 - i
  223. items[i], items[opp] = items[opp], items[i]
  224. }
  225. return Just(items...)
  226. }
  227. // Sort sorts the items from the underlying source.
  228. func (p Stream) Sort(less LessFunc) Stream {
  229. var items []interface{}
  230. for item := range p.source {
  231. items = append(items, item)
  232. }
  233. sort.Slice(items, func(i, j int) bool {
  234. return less(items[i], items[j])
  235. })
  236. return Just(items...)
  237. }
  238. // Split splits the elements into chunk with size up to n,
  239. // might be less than n on tailing elements.
  240. func (p Stream) Split(n int) Stream {
  241. if n < 1 {
  242. panic("n should be greater than 0")
  243. }
  244. source := make(chan interface{})
  245. go func() {
  246. var chunk []interface{}
  247. for item := range p.source {
  248. chunk = append(chunk, item)
  249. if len(chunk) == n {
  250. source <- chunk
  251. chunk = nil
  252. }
  253. }
  254. if chunk != nil {
  255. source <- chunk
  256. }
  257. close(source)
  258. }()
  259. return Range(source)
  260. }
  261. // Tail returns the last n elements in p.
  262. func (p Stream) Tail(n int64) Stream {
  263. if n < 1 {
  264. panic("n should be greater than 0")
  265. }
  266. source := make(chan interface{})
  267. go func() {
  268. ring := collection.NewRing(int(n))
  269. for item := range p.source {
  270. ring.Add(item)
  271. }
  272. for _, item := range ring.Take() {
  273. source <- item
  274. }
  275. close(source)
  276. }()
  277. return Range(source)
  278. }
  279. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  280. func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  281. option := buildOptions(opts...)
  282. if option.unlimitedWorkers {
  283. return p.walkUnlimited(fn, option)
  284. }
  285. return p.walkLimited(fn, option)
  286. }
  287. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  288. pipe := make(chan interface{}, option.workers)
  289. go func() {
  290. var wg sync.WaitGroup
  291. pool := make(chan lang.PlaceholderType, option.workers)
  292. for {
  293. pool <- lang.Placeholder
  294. item, ok := <-p.source
  295. if !ok {
  296. <-pool
  297. break
  298. }
  299. wg.Add(1)
  300. // better to safely run caller defined method
  301. threading.GoSafe(func() {
  302. defer func() {
  303. wg.Done()
  304. <-pool
  305. }()
  306. fn(item, pipe)
  307. })
  308. }
  309. wg.Wait()
  310. close(pipe)
  311. }()
  312. return Range(pipe)
  313. }
  314. func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  315. pipe := make(chan interface{}, defaultWorkers)
  316. go func() {
  317. var wg sync.WaitGroup
  318. for {
  319. item, ok := <-p.source
  320. if !ok {
  321. break
  322. }
  323. wg.Add(1)
  324. // better to safely run caller defined method
  325. threading.GoSafe(func() {
  326. defer wg.Done()
  327. fn(item, pipe)
  328. })
  329. }
  330. wg.Wait()
  331. close(pipe)
  332. }()
  333. return Range(pipe)
  334. }
  335. // AnyMach Returns whether any elements of this stream match the provided predicate.
  336. // May not evaluate the predicate on all elements if not necessary for determining the result.
  337. // If the stream is empty then false is returned and the predicate is not evaluated.
  338. func (p Stream) AnyMach(f func(item interface{}) bool) (isFind bool) {
  339. for item := range p.source {
  340. if f(item) {
  341. isFind = true
  342. return
  343. }
  344. }
  345. return
  346. }
  347. // AllMach Returns whether all elements of this stream match the provided predicate.
  348. // May not evaluate the predicate on all elements if not necessary for determining the result.
  349. // If the stream is empty then true is returned and the predicate is not evaluated.
  350. func (p Stream) AllMach(f func(item interface{}) bool) (isFind bool) {
  351. isFind = true
  352. for item := range p.source {
  353. if !f(item) {
  354. isFind = false
  355. return
  356. }
  357. }
  358. return
  359. }
  360. // Concat Returns a Stream that concat others streams
  361. func (p Stream) Concat(others ...Stream) Stream {
  362. source := make(chan interface{})
  363. wg := sync.WaitGroup{}
  364. for _, other := range others {
  365. if p == other {
  366. continue
  367. }
  368. wg.Add(1)
  369. go func(iother Stream) {
  370. for item := range iother.source {
  371. source <- item
  372. }
  373. wg.Done()
  374. }(other)
  375. }
  376. wg.Add(1)
  377. go func() {
  378. for item := range p.source {
  379. source <- item
  380. }
  381. wg.Done()
  382. }()
  383. go func() {
  384. wg.Wait()
  385. close(source)
  386. }()
  387. return Range(source)
  388. }
  389. // Skip Returns a Stream that skips size elements.
  390. func (p Stream) Skip(size int64) Stream {
  391. if size == 0 {
  392. return p
  393. }
  394. if size < 0 {
  395. panic("size must be greater than -1")
  396. }
  397. source := make(chan interface{})
  398. go func() {
  399. i := 0
  400. for item := range p.source {
  401. if i >= int(size) {
  402. source <- item
  403. }
  404. i++
  405. }
  406. close(source)
  407. }()
  408. return Range(source)
  409. }
  410. // Chan Returns a channel of Stream.
  411. func (p Stream) Chan() <-chan interface{} {
  412. return p.source
  413. }
  414. // UnlimitedWorkers lets the caller to use as many workers as the tasks.
  415. func UnlimitedWorkers() Option {
  416. return func(opts *rxOptions) {
  417. opts.unlimitedWorkers = true
  418. }
  419. }
  420. // WithWorkers lets the caller to customize the concurrent workers.
  421. func WithWorkers(workers int) Option {
  422. return func(opts *rxOptions) {
  423. if workers < minWorkers {
  424. opts.workers = minWorkers
  425. } else {
  426. opts.workers = workers
  427. }
  428. }
  429. }
  430. func buildOptions(opts ...Option) *rxOptions {
  431. options := newOptions()
  432. for _, opt := range opts {
  433. opt(options)
  434. }
  435. return options
  436. }
  437. func newOptions() *rxOptions {
  438. return &rxOptions{
  439. workers: defaultWorkers,
  440. }
  441. }