unbounded_executor.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package concurrent
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. "sync"
  8. "time"
  9. )
  10. // LogInfo logs informational message, for example
  11. // which goroutine is still alive preventing the executor shutdown
  12. var LogInfo = func(event string, properties ...interface{}) {
  13. }
  14. // HandlePanic logs goroutine panic by default
  15. var HandlePanic = func(recovered interface{}, file string, line int) {
  16. fmt.Println(fmt.Sprintf("paniced: %v", recovered))
  17. debug.PrintStack()
  18. }
  19. // StopSignal will not be recovered, will propagate to upper level goroutine
  20. const StopSignal = "STOP!"
  21. // UnboundedExecutor is a executor without limits on counts of alive goroutines
  22. // it tracks the goroutine started by it, and can cancel them when shutdown
  23. type UnboundedExecutor struct {
  24. ctx context.Context
  25. cancel context.CancelFunc
  26. activeGoroutinesMutex *sync.Mutex
  27. activeGoroutines map[string]int
  28. }
  29. // GlobalUnboundedExecutor has the life cycle of the program itself
  30. // any goroutine want to be shutdown before main exit can be started from this executor
  31. // GlobalUnboundedExecutor expects the main function to call stop
  32. // it does not magically knows the main function exits
  33. var GlobalUnboundedExecutor = NewUnboundedExecutor()
  34. // NewUnboundedExecutor creates a new UnboundedExecutor,
  35. // UnboundedExecutor can not be created by &UnboundedExecutor{}
  36. func NewUnboundedExecutor() *UnboundedExecutor {
  37. ctx, cancel := context.WithCancel(context.TODO())
  38. return &UnboundedExecutor{
  39. ctx: ctx,
  40. cancel: cancel,
  41. activeGoroutinesMutex: &sync.Mutex{},
  42. activeGoroutines: map[string]int{},
  43. }
  44. }
  45. // Go starts a new goroutine and tracks its lifecycle.
  46. // Panic will be recovered and logged automatically, except for StopSignal
  47. func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
  48. _, file, line, _ := runtime.Caller(1)
  49. executor.activeGoroutinesMutex.Lock()
  50. defer executor.activeGoroutinesMutex.Unlock()
  51. startFrom := fmt.Sprintf("%s:%d", file, line)
  52. executor.activeGoroutines[startFrom] += 1
  53. go func() {
  54. defer func() {
  55. recovered := recover()
  56. if recovered != nil && recovered != StopSignal {
  57. HandlePanic(recovered, file, line)
  58. }
  59. executor.activeGoroutinesMutex.Lock()
  60. defer executor.activeGoroutinesMutex.Unlock()
  61. executor.activeGoroutines[startFrom] -= 1
  62. }()
  63. handler(executor.ctx)
  64. }()
  65. }
  66. // Stop cancel all goroutines started by this executor without wait
  67. func (executor *UnboundedExecutor) Stop() {
  68. executor.cancel()
  69. }
  70. // StopAndWaitForever cancel all goroutines started by this executor and
  71. // wait until all goroutines exited
  72. func (executor *UnboundedExecutor) StopAndWaitForever() {
  73. executor.StopAndWait(context.Background())
  74. }
  75. // StopAndWait cancel all goroutines started by this executor and wait.
  76. // Wait can be cancelled by the context passed in.
  77. func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
  78. executor.cancel()
  79. for {
  80. fiveSeconds := time.NewTimer(time.Millisecond * 100)
  81. select {
  82. case <-fiveSeconds.C:
  83. case <-ctx.Done():
  84. return
  85. }
  86. if executor.checkGoroutines() {
  87. return
  88. }
  89. }
  90. }
  91. func (executor *UnboundedExecutor) checkGoroutines() bool {
  92. executor.activeGoroutinesMutex.Lock()
  93. defer executor.activeGoroutinesMutex.Unlock()
  94. for startFrom, count := range executor.activeGoroutines {
  95. if count > 0 {
  96. LogInfo("event!unbounded_executor.still waiting goroutines to quit",
  97. "startFrom", startFrom,
  98. "count", count)
  99. return false
  100. }
  101. }
  102. return true
  103. }