sharedcalls.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package syncx
  2. import "sync"
  3. type (
  4. // SharedCalls lets the concurrent calls with the same key to share the call result.
  5. // For example, A called F, before it's done, B called F. Then B would not execute F,
  6. // and shared the result returned by F which called by A.
  7. // The calls with the same key are dependent, concurrent calls share the returned values.
  8. // A ------->calls F with key<------------------->returns val
  9. // B --------------------->calls F with key------>returns val
  10. SharedCalls interface {
  11. Do(key string, fn func() (interface{}, error)) (interface{}, error)
  12. DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
  13. }
  14. call struct {
  15. wg sync.WaitGroup
  16. val interface{}
  17. err error
  18. }
  19. sharedGroup struct {
  20. calls map[string]*call
  21. lock sync.Mutex
  22. }
  23. )
  24. func NewSharedCalls() SharedCalls {
  25. return &sharedGroup{
  26. calls: make(map[string]*call),
  27. }
  28. }
  29. func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  30. g.lock.Lock()
  31. if c, ok := g.calls[key]; ok {
  32. g.lock.Unlock()
  33. c.wg.Wait()
  34. return c.val, c.err
  35. }
  36. c := g.makeCall(key, fn)
  37. return c.val, c.err
  38. }
  39. func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
  40. g.lock.Lock()
  41. if c, ok := g.calls[key]; ok {
  42. g.lock.Unlock()
  43. c.wg.Wait()
  44. return c.val, false, c.err
  45. }
  46. c := g.makeCall(key, fn)
  47. return c.val, true, c.err
  48. }
  49. func (g *sharedGroup) makeCall(key string, fn func() (interface{}, error)) *call {
  50. c := new(call)
  51. c.wg.Add(1)
  52. g.calls[key] = c
  53. g.lock.Unlock()
  54. defer func() {
  55. // delete key first, done later. can't reverse the order, because if reverse,
  56. // another Do call might wg.Wait() without get notified with wg.Done()
  57. g.lock.Lock()
  58. delete(g.calls, key)
  59. g.lock.Unlock()
  60. c.wg.Done()
  61. }()
  62. c.val, c.err = fn()
  63. return c
  64. }