lockedcalls.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package syncx
  2. import "sync"
  3. type (
  4. // LockedCalls makes sure the calls with the same key to be called sequentially.
  5. // For example, A called F, before it's done, B called F, then B's call would not blocked,
  6. // after A's call finished, B's call got executed.
  7. // The calls with the same key are independent, not sharing the returned values.
  8. // A ------->calls F with key and executes<------->returns
  9. // B ------------------>calls F with key<--------->executes<---->returns
  10. LockedCalls interface {
  11. Do(key string, fn func() (interface{}, error)) (interface{}, error)
  12. }
  13. lockedGroup struct {
  14. mu sync.Mutex
  15. m map[string]*sync.WaitGroup
  16. }
  17. )
  18. // NewLockedCalls returns a LockedCalls.
  19. func NewLockedCalls() LockedCalls {
  20. return &lockedGroup{
  21. m: make(map[string]*sync.WaitGroup),
  22. }
  23. }
  24. func (lg *lockedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  25. begin:
  26. lg.mu.Lock()
  27. if wg, ok := lg.m[key]; ok {
  28. lg.mu.Unlock()
  29. wg.Wait()
  30. goto begin
  31. }
  32. return lg.makeCall(key, fn)
  33. }
  34. func (lg *lockedGroup) makeCall(key string, fn func() (interface{}, error)) (interface{}, error) {
  35. var wg sync.WaitGroup
  36. wg.Add(1)
  37. lg.m[key] = &wg
  38. lg.mu.Unlock()
  39. defer func() {
  40. // delete key first, done later. can't reverse the order, because if reverse,
  41. // another Do call might wg.Wait() without get notified with wg.Done()
  42. lg.mu.Lock()
  43. delete(lg.m, key)
  44. lg.mu.Unlock()
  45. wg.Done()
  46. }()
  47. return fn()
  48. }