sharedcalls.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package syncx
  2. import "sync"
  3. type (
  4. // SharedCalls lets the concurrent calls with the same key to share the call result.
  5. // For example, A called F, before it's done, B called F. Then B would not execute F,
  6. // and shared the result returned by F which called by A.
  7. // The calls with the same key are dependent, concurrent calls share the returned values.
  8. // A ------->calls F with key<------------------->returns val
  9. // B --------------------->calls F with key------>returns val
  10. SharedCalls interface {
  11. Do(key string, fn func() (interface{}, error)) (interface{}, error)
  12. DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
  13. }
  14. call struct {
  15. wg sync.WaitGroup
  16. val interface{}
  17. err error
  18. }
  19. sharedGroup struct {
  20. calls map[string]*call
  21. lock sync.Mutex
  22. }
  23. )
  24. func NewSharedCalls() SharedCalls {
  25. return &sharedGroup{
  26. calls: make(map[string]*call),
  27. }
  28. }
  29. func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  30. c, done := g.createCall(key)
  31. if done {
  32. return c.val, c.err
  33. }
  34. g.makeCall(c, key, fn)
  35. return c.val, c.err
  36. }
  37. func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
  38. c, done := g.createCall(key)
  39. if done {
  40. return c.val, false, c.err
  41. }
  42. g.makeCall(c, key, fn)
  43. return c.val, true, c.err
  44. }
  45. func (g *sharedGroup) createCall(key string) (c *call, done bool) {
  46. g.lock.Lock()
  47. if c, ok := g.calls[key]; ok {
  48. g.lock.Unlock()
  49. c.wg.Wait()
  50. return c, true
  51. }
  52. c = new(call)
  53. c.wg.Add(1)
  54. g.calls[key] = c
  55. g.lock.Unlock()
  56. return c, false
  57. }
  58. func (g *sharedGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
  59. defer func() {
  60. g.lock.Lock()
  61. delete(g.calls, key)
  62. g.lock.Unlock()
  63. c.wg.Done()
  64. }()
  65. c.val, c.err = fn()
  66. }