stream.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "git.i2edu.net/i2/go-zero/core/collection"
  6. "git.i2edu.net/i2/go-zero/core/lang"
  7. "git.i2edu.net/i2/go-zero/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. // FilterFunc defines the method to filter a Stream.
  19. FilterFunc func(item interface{}) bool
  20. // ForAllFunc defines the method to handle all elements in a Stream.
  21. ForAllFunc func(pipe <-chan interface{})
  22. // ForEachFunc defines the method to handle each element in a Stream.
  23. ForEachFunc func(item interface{})
  24. // GenerateFunc defines the method to send elements into a Stream.
  25. GenerateFunc func(source chan<- interface{})
  26. // KeyFunc defines the method to generate keys for the elements in a Stream.
  27. KeyFunc func(item interface{}) interface{}
  28. // LessFunc defines the method to compare the elements in a Stream.
  29. LessFunc func(a, b interface{}) bool
  30. // MapFunc defines the method to map each element to another object in a Stream.
  31. MapFunc func(item interface{}) interface{}
  32. // Option defines the method to customize a Stream.
  33. Option func(opts *rxOptions)
  34. // ParallelFunc defines the method to handle elements parallelly.
  35. ParallelFunc func(item interface{})
  36. // ReduceFunc defines the method to reduce all the elements in a Stream.
  37. ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
  38. // WalkFunc defines the method to walk through all the elements in a Stream.
  39. WalkFunc func(item interface{}, pipe chan<- interface{})
  40. // A Stream is a stream that can be used to do stream processing.
  41. Stream struct {
  42. source <-chan interface{}
  43. }
  44. )
  45. // Concat returns a concatenated Stream.
  46. func Concat(s Stream, others ...Stream) Stream {
  47. return s.Concat(others...)
  48. }
  49. // From constructs a Stream from the given GenerateFunc.
  50. func From(generate GenerateFunc) Stream {
  51. source := make(chan interface{})
  52. threading.GoSafe(func() {
  53. defer close(source)
  54. generate(source)
  55. })
  56. return Range(source)
  57. }
  58. // Just converts the given arbitrary items to a Stream.
  59. func Just(items ...interface{}) Stream {
  60. source := make(chan interface{}, len(items))
  61. for _, item := range items {
  62. source <- item
  63. }
  64. close(source)
  65. return Range(source)
  66. }
  67. // Range converts the given channel to a Stream.
  68. func Range(source <-chan interface{}) Stream {
  69. return Stream{
  70. source: source,
  71. }
  72. }
  73. // AllMach returns whether all elements of this stream match the provided predicate.
  74. // May not evaluate the predicate on all elements if not necessary for determining the result.
  75. // If the stream is empty then true is returned and the predicate is not evaluated.
  76. func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
  77. for item := range s.source {
  78. if !predicate(item) {
  79. return false
  80. }
  81. }
  82. return true
  83. }
  84. // AnyMach returns whether any elements of this stream match the provided predicate.
  85. // May not evaluate the predicate on all elements if not necessary for determining the result.
  86. // If the stream is empty then false is returned and the predicate is not evaluated.
  87. func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
  88. for item := range s.source {
  89. if predicate(item) {
  90. return true
  91. }
  92. }
  93. return false
  94. }
  95. // Buffer buffers the items into a queue with size n.
  96. // It can balance the producer and the consumer if their processing throughput don't match.
  97. func (s Stream) Buffer(n int) Stream {
  98. if n < 0 {
  99. n = 0
  100. }
  101. source := make(chan interface{}, n)
  102. go func() {
  103. for item := range s.source {
  104. source <- item
  105. }
  106. close(source)
  107. }()
  108. return Range(source)
  109. }
  110. // Concat returns a Stream that concatenated other streams
  111. func (s Stream) Concat(others ...Stream) Stream {
  112. source := make(chan interface{})
  113. go func() {
  114. group := threading.NewRoutineGroup()
  115. group.Run(func() {
  116. for item := range s.source {
  117. source <- item
  118. }
  119. })
  120. for _, each := range others {
  121. each := each
  122. group.Run(func() {
  123. for item := range each.source {
  124. source <- item
  125. }
  126. })
  127. }
  128. group.Wait()
  129. close(source)
  130. }()
  131. return Range(source)
  132. }
  133. // Count counts the number of elements in the result.
  134. func (s Stream) Count() (count int) {
  135. for range s.source {
  136. count++
  137. }
  138. return
  139. }
  140. // Distinct removes the duplicated items base on the given KeyFunc.
  141. func (s Stream) Distinct(fn KeyFunc) Stream {
  142. source := make(chan interface{})
  143. threading.GoSafe(func() {
  144. defer close(source)
  145. keys := make(map[interface{}]lang.PlaceholderType)
  146. for item := range s.source {
  147. key := fn(item)
  148. if _, ok := keys[key]; !ok {
  149. source <- item
  150. keys[key] = lang.Placeholder
  151. }
  152. }
  153. })
  154. return Range(source)
  155. }
  156. // Done waits all upstreaming operations to be done.
  157. func (s Stream) Done() {
  158. for range s.source {
  159. }
  160. }
  161. // Filter filters the items by the given FilterFunc.
  162. func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  163. return s.Walk(func(item interface{}, pipe chan<- interface{}) {
  164. if fn(item) {
  165. pipe <- item
  166. }
  167. }, opts...)
  168. }
  169. // ForAll handles the streaming elements from the source and no later streams.
  170. func (s Stream) ForAll(fn ForAllFunc) {
  171. fn(s.source)
  172. }
  173. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  174. func (s Stream) ForEach(fn ForEachFunc) {
  175. for item := range s.source {
  176. fn(item)
  177. }
  178. }
  179. // Group groups the elements into different groups based on their keys.
  180. func (s Stream) Group(fn KeyFunc) Stream {
  181. groups := make(map[interface{}][]interface{})
  182. for item := range s.source {
  183. key := fn(item)
  184. groups[key] = append(groups[key], item)
  185. }
  186. source := make(chan interface{})
  187. go func() {
  188. for _, group := range groups {
  189. source <- group
  190. }
  191. close(source)
  192. }()
  193. return Range(source)
  194. }
  195. // Head returns the first n elements in p.
  196. func (s Stream) Head(n int64) Stream {
  197. if n < 1 {
  198. panic("n must be greater than 0")
  199. }
  200. source := make(chan interface{})
  201. go func() {
  202. for item := range s.source {
  203. n--
  204. if n >= 0 {
  205. source <- item
  206. }
  207. if n == 0 {
  208. // let successive method go ASAP even we have more items to skip
  209. // why we don't just break the loop, because if break,
  210. // this former goroutine will block forever, which will cause goroutine leak.
  211. close(source)
  212. }
  213. }
  214. if n > 0 {
  215. close(source)
  216. }
  217. }()
  218. return Range(source)
  219. }
  220. // Map converts each item to another corresponding item, which means it's a 1:1 model.
  221. func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
  222. return s.Walk(func(item interface{}, pipe chan<- interface{}) {
  223. pipe <- fn(item)
  224. }, opts...)
  225. }
  226. // Merge merges all the items into a slice and generates a new stream.
  227. func (s Stream) Merge() Stream {
  228. var items []interface{}
  229. for item := range s.source {
  230. items = append(items, item)
  231. }
  232. source := make(chan interface{}, 1)
  233. source <- items
  234. close(source)
  235. return Range(source)
  236. }
  237. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  238. func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
  239. s.Walk(func(item interface{}, pipe chan<- interface{}) {
  240. fn(item)
  241. }, opts...).Done()
  242. }
  243. // Reduce is a utility method to let the caller deal with the underlying channel.
  244. func (s Stream) Reduce(fn ReduceFunc) (interface{}, error) {
  245. return fn(s.source)
  246. }
  247. // Reverse reverses the elements in the stream.
  248. func (s Stream) Reverse() Stream {
  249. var items []interface{}
  250. for item := range s.source {
  251. items = append(items, item)
  252. }
  253. // reverse, official method
  254. for i := len(items)/2 - 1; i >= 0; i-- {
  255. opp := len(items) - 1 - i
  256. items[i], items[opp] = items[opp], items[i]
  257. }
  258. return Just(items...)
  259. }
  260. // Skip returns a Stream that skips size elements.
  261. func (s Stream) Skip(n int64) Stream {
  262. if n < 0 {
  263. panic("n must not be negative")
  264. }
  265. if n == 0 {
  266. return s
  267. }
  268. source := make(chan interface{})
  269. go func() {
  270. for item := range s.source {
  271. n--
  272. if n >= 0 {
  273. continue
  274. } else {
  275. source <- item
  276. }
  277. }
  278. close(source)
  279. }()
  280. return Range(source)
  281. }
  282. // Sort sorts the items from the underlying source.
  283. func (s Stream) Sort(less LessFunc) Stream {
  284. var items []interface{}
  285. for item := range s.source {
  286. items = append(items, item)
  287. }
  288. sort.Slice(items, func(i, j int) bool {
  289. return less(items[i], items[j])
  290. })
  291. return Just(items...)
  292. }
  293. // Split splits the elements into chunk with size up to n,
  294. // might be less than n on tailing elements.
  295. func (s Stream) Split(n int) Stream {
  296. if n < 1 {
  297. panic("n should be greater than 0")
  298. }
  299. source := make(chan interface{})
  300. go func() {
  301. var chunk []interface{}
  302. for item := range s.source {
  303. chunk = append(chunk, item)
  304. if len(chunk) == n {
  305. source <- chunk
  306. chunk = nil
  307. }
  308. }
  309. if chunk != nil {
  310. source <- chunk
  311. }
  312. close(source)
  313. }()
  314. return Range(source)
  315. }
  316. // Tail returns the last n elements in p.
  317. func (s Stream) Tail(n int64) Stream {
  318. if n < 1 {
  319. panic("n should be greater than 0")
  320. }
  321. source := make(chan interface{})
  322. go func() {
  323. ring := collection.NewRing(int(n))
  324. for item := range s.source {
  325. ring.Add(item)
  326. }
  327. for _, item := range ring.Take() {
  328. source <- item
  329. }
  330. close(source)
  331. }()
  332. return Range(source)
  333. }
  334. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  335. func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  336. option := buildOptions(opts...)
  337. if option.unlimitedWorkers {
  338. return s.walkUnlimited(fn, option)
  339. }
  340. return s.walkLimited(fn, option)
  341. }
  342. func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  343. pipe := make(chan interface{}, option.workers)
  344. go func() {
  345. var wg sync.WaitGroup
  346. pool := make(chan lang.PlaceholderType, option.workers)
  347. for {
  348. pool <- lang.Placeholder
  349. item, ok := <-s.source
  350. if !ok {
  351. <-pool
  352. break
  353. }
  354. wg.Add(1)
  355. // better to safely run caller defined method
  356. threading.GoSafe(func() {
  357. defer func() {
  358. wg.Done()
  359. <-pool
  360. }()
  361. fn(item, pipe)
  362. })
  363. }
  364. wg.Wait()
  365. close(pipe)
  366. }()
  367. return Range(pipe)
  368. }
  369. func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  370. pipe := make(chan interface{}, defaultWorkers)
  371. go func() {
  372. var wg sync.WaitGroup
  373. for {
  374. item, ok := <-s.source
  375. if !ok {
  376. break
  377. }
  378. wg.Add(1)
  379. // better to safely run caller defined method
  380. threading.GoSafe(func() {
  381. defer wg.Done()
  382. fn(item, pipe)
  383. })
  384. }
  385. wg.Wait()
  386. close(pipe)
  387. }()
  388. return Range(pipe)
  389. }
  390. // UnlimitedWorkers lets the caller to use as many workers as the tasks.
  391. func UnlimitedWorkers() Option {
  392. return func(opts *rxOptions) {
  393. opts.unlimitedWorkers = true
  394. }
  395. }
  396. // WithWorkers lets the caller to customize the concurrent workers.
  397. func WithWorkers(workers int) Option {
  398. return func(opts *rxOptions) {
  399. if workers < minWorkers {
  400. opts.workers = minWorkers
  401. } else {
  402. opts.workers = workers
  403. }
  404. }
  405. }
  406. func buildOptions(opts ...Option) *rxOptions {
  407. options := newOptions()
  408. for _, opt := range opts {
  409. opt(options)
  410. }
  411. return options
  412. }
  413. func newOptions() *rxOptions {
  414. return &rxOptions{
  415. workers: defaultWorkers,
  416. }
  417. }