stream.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. // FilterFunc defines the method to filter a Stream.
  19. FilterFunc func(item interface{}) bool
  20. // ForAllFunc defines the method to handle all elements in a Stream.
  21. ForAllFunc func(pipe <-chan interface{})
  22. // ForEachFunc defines the method to handle each element in a Stream.
  23. ForEachFunc func(item interface{})
  24. // GenerateFunc defines the method to send elements into a Stream.
  25. GenerateFunc func(source chan<- interface{})
  26. // KeyFunc defines the method to generate keys for the elements in a Stream.
  27. KeyFunc func(item interface{}) interface{}
  28. // LessFunc defines the method to compare the elements in a Stream.
  29. LessFunc func(a, b interface{}) bool
  30. // MapFunc defines the method to map each element to another object in a Stream.
  31. MapFunc func(item interface{}) interface{}
  32. // Option defines the method to customize a Stream.
  33. Option func(opts *rxOptions)
  34. // ParallelFunc defines the method to handle elements parallelly.
  35. ParallelFunc func(item interface{})
  36. // ReduceFunc defines the method to reduce all the elements in a Stream.
  37. ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
  38. // WalkFunc defines the method to walk through all the elements in a Stream.
  39. WalkFunc func(item interface{}, pipe chan<- interface{})
  40. // A Stream is a stream that can be used to do stream processing.
  41. Stream struct {
  42. source <-chan interface{}
  43. }
  44. )
  45. // From constructs a Stream from the given GenerateFunc.
  46. func From(generate GenerateFunc) Stream {
  47. source := make(chan interface{})
  48. threading.GoSafe(func() {
  49. defer close(source)
  50. generate(source)
  51. })
  52. return Range(source)
  53. }
  54. // Just converts the given arbitrary items to a Stream.
  55. func Just(items ...interface{}) Stream {
  56. source := make(chan interface{}, len(items))
  57. for _, item := range items {
  58. source <- item
  59. }
  60. close(source)
  61. return Range(source)
  62. }
  63. // Range converts the given channel to a Stream.
  64. func Range(source <-chan interface{}) Stream {
  65. return Stream{
  66. source: source,
  67. }
  68. }
  69. // Buffer buffers the items into a queue with size n.
  70. // It can balance the producer and the consumer if their processing throughput don't match.
  71. func (p Stream) Buffer(n int) Stream {
  72. if n < 0 {
  73. n = 0
  74. }
  75. source := make(chan interface{}, n)
  76. go func() {
  77. for item := range p.source {
  78. source <- item
  79. }
  80. close(source)
  81. }()
  82. return Range(source)
  83. }
  84. // Count counts the number of elements in the result.
  85. func (p Stream) Count() (count int) {
  86. for range p.source {
  87. count++
  88. }
  89. return
  90. }
  91. // Distinct removes the duplicated items base on the given KeyFunc.
  92. func (p Stream) Distinct(fn KeyFunc) Stream {
  93. source := make(chan interface{})
  94. threading.GoSafe(func() {
  95. defer close(source)
  96. keys := make(map[interface{}]lang.PlaceholderType)
  97. for item := range p.source {
  98. key := fn(item)
  99. if _, ok := keys[key]; !ok {
  100. source <- item
  101. keys[key] = lang.Placeholder
  102. }
  103. }
  104. })
  105. return Range(source)
  106. }
  107. // Done waits all upstreaming operations to be done.
  108. func (p Stream) Done() {
  109. for range p.source {
  110. }
  111. }
  112. // Filter filters the items by the given FilterFunc.
  113. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  114. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  115. if fn(item) {
  116. pipe <- item
  117. }
  118. }, opts...)
  119. }
  120. // ForAll handles the streaming elements from the source and no later streams.
  121. func (p Stream) ForAll(fn ForAllFunc) {
  122. fn(p.source)
  123. }
  124. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  125. func (p Stream) ForEach(fn ForEachFunc) {
  126. for item := range p.source {
  127. fn(item)
  128. }
  129. }
  130. // Group groups the elements into different groups based on their keys.
  131. func (p Stream) Group(fn KeyFunc) Stream {
  132. groups := make(map[interface{}][]interface{})
  133. for item := range p.source {
  134. key := fn(item)
  135. groups[key] = append(groups[key], item)
  136. }
  137. source := make(chan interface{})
  138. go func() {
  139. for _, group := range groups {
  140. source <- group
  141. }
  142. close(source)
  143. }()
  144. return Range(source)
  145. }
  146. // Head returns the first n elements in p.
  147. func (p Stream) Head(n int64) Stream {
  148. if n < 1 {
  149. panic("n must be greater than 0")
  150. }
  151. source := make(chan interface{})
  152. go func() {
  153. for item := range p.source {
  154. n--
  155. if n >= 0 {
  156. source <- item
  157. }
  158. if n == 0 {
  159. // let successive method go ASAP even we have more items to skip
  160. // why we don't just break the loop, because if break,
  161. // this former goroutine will block forever, which will cause goroutine leak.
  162. close(source)
  163. }
  164. }
  165. if n > 0 {
  166. close(source)
  167. }
  168. }()
  169. return Range(source)
  170. }
  171. // Map converts each item to another corresponding item, which means it's a 1:1 model.
  172. func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
  173. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  174. pipe <- fn(item)
  175. }, opts...)
  176. }
  177. // Merge merges all the items into a slice and generates a new stream.
  178. func (p Stream) Merge() Stream {
  179. var items []interface{}
  180. for item := range p.source {
  181. items = append(items, item)
  182. }
  183. source := make(chan interface{}, 1)
  184. source <- items
  185. close(source)
  186. return Range(source)
  187. }
  188. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  189. func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
  190. p.Walk(func(item interface{}, pipe chan<- interface{}) {
  191. fn(item)
  192. }, opts...).Done()
  193. }
  194. // Reduce is a utility method to let the caller deal with the underlying channel.
  195. func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
  196. return fn(p.source)
  197. }
  198. // Reverse reverses the elements in the stream.
  199. func (p Stream) Reverse() Stream {
  200. var items []interface{}
  201. for item := range p.source {
  202. items = append(items, item)
  203. }
  204. // reverse, official method
  205. for i := len(items)/2 - 1; i >= 0; i-- {
  206. opp := len(items) - 1 - i
  207. items[i], items[opp] = items[opp], items[i]
  208. }
  209. return Just(items...)
  210. }
  211. // Sort sorts the items from the underlying source.
  212. func (p Stream) Sort(less LessFunc) Stream {
  213. var items []interface{}
  214. for item := range p.source {
  215. items = append(items, item)
  216. }
  217. sort.Slice(items, func(i, j int) bool {
  218. return less(items[i], items[j])
  219. })
  220. return Just(items...)
  221. }
  222. // Split splits the elements into chunk with size up to n,
  223. // might be less than n on tailing elements.
  224. func (p Stream) Split(n int) Stream {
  225. if n < 1 {
  226. panic("n should be greater than 0")
  227. }
  228. source := make(chan interface{})
  229. go func() {
  230. var chunk []interface{}
  231. for item := range p.source {
  232. chunk = append(chunk, item)
  233. if len(chunk) == n {
  234. source <- chunk
  235. chunk = nil
  236. }
  237. }
  238. if chunk != nil {
  239. source <- chunk
  240. }
  241. close(source)
  242. }()
  243. return Range(source)
  244. }
  245. // Tail returns the last n elements in p.
  246. func (p Stream) Tail(n int64) Stream {
  247. if n < 1 {
  248. panic("n should be greater than 0")
  249. }
  250. source := make(chan interface{})
  251. go func() {
  252. ring := collection.NewRing(int(n))
  253. for item := range p.source {
  254. ring.Add(item)
  255. }
  256. for _, item := range ring.Take() {
  257. source <- item
  258. }
  259. close(source)
  260. }()
  261. return Range(source)
  262. }
  263. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  264. func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  265. option := buildOptions(opts...)
  266. if option.unlimitedWorkers {
  267. return p.walkUnlimited(fn, option)
  268. }
  269. return p.walkLimited(fn, option)
  270. }
  271. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  272. pipe := make(chan interface{}, option.workers)
  273. go func() {
  274. var wg sync.WaitGroup
  275. pool := make(chan lang.PlaceholderType, option.workers)
  276. for {
  277. pool <- lang.Placeholder
  278. item, ok := <-p.source
  279. if !ok {
  280. <-pool
  281. break
  282. }
  283. wg.Add(1)
  284. // better to safely run caller defined method
  285. threading.GoSafe(func() {
  286. defer func() {
  287. wg.Done()
  288. <-pool
  289. }()
  290. fn(item, pipe)
  291. })
  292. }
  293. wg.Wait()
  294. close(pipe)
  295. }()
  296. return Range(pipe)
  297. }
  298. func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  299. pipe := make(chan interface{}, defaultWorkers)
  300. go func() {
  301. var wg sync.WaitGroup
  302. for {
  303. item, ok := <-p.source
  304. if !ok {
  305. break
  306. }
  307. wg.Add(1)
  308. // better to safely run caller defined method
  309. threading.GoSafe(func() {
  310. defer wg.Done()
  311. fn(item, pipe)
  312. })
  313. }
  314. wg.Wait()
  315. close(pipe)
  316. }()
  317. return Range(pipe)
  318. }
  319. // UnlimitedWorkers lets the caller to use as many workers as the tasks.
  320. func UnlimitedWorkers() Option {
  321. return func(opts *rxOptions) {
  322. opts.unlimitedWorkers = true
  323. }
  324. }
  325. // WithWorkers lets the caller to customize the concurrent workers.
  326. func WithWorkers(workers int) Option {
  327. return func(opts *rxOptions) {
  328. if workers < minWorkers {
  329. opts.workers = minWorkers
  330. } else {
  331. opts.workers = workers
  332. }
  333. }
  334. }
  335. func buildOptions(opts ...Option) *rxOptions {
  336. options := newOptions()
  337. for _, opt := range opts {
  338. opt(options)
  339. }
  340. return options
  341. }
  342. func newOptions() *rxOptions {
  343. return &rxOptions{
  344. workers: defaultWorkers,
  345. }
  346. }