mr.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package main
  2. import (
  3. "log"
  4. "time"
  5. "github.com/tal-tech/go-zero/core/mr"
  6. "github.com/tal-tech/go-zero/core/timex"
  7. )
  8. type user struct{}
  9. func (u *user) User(uid int64) (interface{}, error) {
  10. time.Sleep(time.Millisecond * 30)
  11. return nil, nil
  12. }
  13. type store struct{}
  14. func (s *store) Store(pid int64) (interface{}, error) {
  15. time.Sleep(time.Millisecond * 50)
  16. return nil, nil
  17. }
  18. type order struct{}
  19. func (o *order) Order(pid int64) (interface{}, error) {
  20. time.Sleep(time.Millisecond * 40)
  21. return nil, nil
  22. }
  23. var (
  24. userRpc user
  25. storeRpc store
  26. orderRpc order
  27. )
  28. func main() {
  29. start := timex.Now()
  30. _, err := productDetail(123, 345)
  31. if err != nil {
  32. log.Printf("product detail error: %v", err)
  33. return
  34. }
  35. log.Printf("productDetail time: %v", timex.Since(start))
  36. // the data processing
  37. res, err := checkLegal([]int64{1, 2, 3})
  38. if err != nil {
  39. log.Printf("check error: %v", err)
  40. return
  41. }
  42. log.Printf("check res: %v", res)
  43. }
  44. type ProductDetail struct {
  45. User interface{}
  46. Store interface{}
  47. Order interface{}
  48. }
  49. func productDetail(uid, pid int64) (*ProductDetail, error) {
  50. var pd ProductDetail
  51. err := mr.Finish(func() (err error) {
  52. pd.User, err = userRpc.User(uid)
  53. return
  54. }, func() (err error) {
  55. pd.Store, err = storeRpc.Store(pid)
  56. return
  57. }, func() (err error) {
  58. pd.Order, err = orderRpc.Order(pid)
  59. return
  60. })
  61. if err != nil {
  62. return nil, err
  63. }
  64. return &pd, nil
  65. }
  66. func checkLegal(uids []int64) ([]int64, error) {
  67. r, err := mr.MapReduce(func(source chan<- interface{}) {
  68. for _, uid := range uids {
  69. source <- uid
  70. }
  71. }, func(item interface{}, writer mr.Writer, cancel func(error)) {
  72. uid := item.(int64)
  73. ok, err := check(uid)
  74. if err != nil {
  75. cancel(err)
  76. }
  77. if ok {
  78. writer.Write(uid)
  79. }
  80. }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
  81. var uids []int64
  82. for p := range pipe {
  83. uids = append(uids, p.(int64))
  84. }
  85. writer.Write(uids)
  86. })
  87. if err != nil {
  88. return nil, err
  89. }
  90. return r.([]int64), nil
  91. }
  92. func check(uid int64) (bool, error) {
  93. // do something check user legal
  94. time.Sleep(time.Millisecond * 20)
  95. return true, nil
  96. }