sharedcalls.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package syncx
  2. import "sync"
  3. type (
  4. // SharedCalls lets the concurrent calls with the same key to share the call result.
  5. // For example, A called F, before it's done, B called F. Then B would not execute F,
  6. // and shared the result returned by F which called by A.
  7. // The calls with the same key are dependent, concurrent calls share the returned values.
  8. // A ------->calls F with key<------------------->returns val
  9. // B --------------------->calls F with key------>returns val
  10. SharedCalls interface {
  11. Do(key string, fn func() (interface{}, error)) (interface{}, error)
  12. DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
  13. }
  14. call struct {
  15. wg sync.WaitGroup
  16. val interface{}
  17. err error
  18. }
  19. sharedGroup struct {
  20. calls map[string]*call
  21. lock sync.Mutex
  22. }
  23. )
  24. // NewSharedCalls returns a SharedCalls.
  25. func NewSharedCalls() SharedCalls {
  26. return &sharedGroup{
  27. calls: make(map[string]*call),
  28. }
  29. }
  30. func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  31. c, done := g.createCall(key)
  32. if done {
  33. return c.val, c.err
  34. }
  35. g.makeCall(c, key, fn)
  36. return c.val, c.err
  37. }
  38. func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
  39. c, done := g.createCall(key)
  40. if done {
  41. return c.val, false, c.err
  42. }
  43. g.makeCall(c, key, fn)
  44. return c.val, true, c.err
  45. }
  46. func (g *sharedGroup) createCall(key string) (c *call, done bool) {
  47. g.lock.Lock()
  48. if c, ok := g.calls[key]; ok {
  49. g.lock.Unlock()
  50. c.wg.Wait()
  51. return c, true
  52. }
  53. c = new(call)
  54. c.wg.Add(1)
  55. g.calls[key] = c
  56. g.lock.Unlock()
  57. return c, false
  58. }
  59. func (g *sharedGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
  60. defer func() {
  61. g.lock.Lock()
  62. delete(g.calls, key)
  63. g.lock.Unlock()
  64. c.wg.Done()
  65. }()
  66. c.val, c.err = fn()
  67. }