unbounded_executor.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package concurrent
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. "sync"
  8. "time"
  9. "reflect"
  10. )
  11. // HandlePanic logs goroutine panic by default
  12. var HandlePanic = func(recovered interface{}, funcName string) {
  13. ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
  14. ErrorLogger.Println(string(debug.Stack()))
  15. }
  16. // StopSignal will not be recovered, will propagate to upper level goroutine
  17. const StopSignal = "STOP!"
  18. // UnboundedExecutor is a executor without limits on counts of alive goroutines
  19. // it tracks the goroutine started by it, and can cancel them when shutdown
  20. type UnboundedExecutor struct {
  21. ctx context.Context
  22. cancel context.CancelFunc
  23. activeGoroutinesMutex *sync.Mutex
  24. activeGoroutines map[string]int
  25. HandlePanic func(recovered interface{}, funcName string)
  26. }
  27. // GlobalUnboundedExecutor has the life cycle of the program itself
  28. // any goroutine want to be shutdown before main exit can be started from this executor
  29. // GlobalUnboundedExecutor expects the main function to call stop
  30. // it does not magically knows the main function exits
  31. var GlobalUnboundedExecutor = NewUnboundedExecutor()
  32. // NewUnboundedExecutor creates a new UnboundedExecutor,
  33. // UnboundedExecutor can not be created by &UnboundedExecutor{}
  34. // HandlePanic can be set with a callback to override global HandlePanic
  35. func NewUnboundedExecutor() *UnboundedExecutor {
  36. ctx, cancel := context.WithCancel(context.TODO())
  37. return &UnboundedExecutor{
  38. ctx: ctx,
  39. cancel: cancel,
  40. activeGoroutinesMutex: &sync.Mutex{},
  41. activeGoroutines: map[string]int{},
  42. }
  43. }
  44. // Go starts a new goroutine and tracks its lifecycle.
  45. // Panic will be recovered and logged automatically, except for StopSignal
  46. func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
  47. pc := reflect.ValueOf(handler).Pointer()
  48. f := runtime.FuncForPC(pc)
  49. funcName := f.Name()
  50. file, line := f.FileLine(pc)
  51. executor.activeGoroutinesMutex.Lock()
  52. defer executor.activeGoroutinesMutex.Unlock()
  53. startFrom := fmt.Sprintf("%s:%d", file, line)
  54. executor.activeGoroutines[startFrom] += 1
  55. go func() {
  56. defer func() {
  57. recovered := recover()
  58. if recovered != nil && recovered != StopSignal {
  59. if executor.HandlePanic == nil {
  60. HandlePanic(recovered, funcName)
  61. } else {
  62. executor.HandlePanic(recovered, funcName)
  63. }
  64. }
  65. executor.activeGoroutinesMutex.Lock()
  66. defer executor.activeGoroutinesMutex.Unlock()
  67. executor.activeGoroutines[startFrom] -= 1
  68. }()
  69. handler(executor.ctx)
  70. }()
  71. }
  72. // Stop cancel all goroutines started by this executor without wait
  73. func (executor *UnboundedExecutor) Stop() {
  74. executor.cancel()
  75. }
  76. // StopAndWaitForever cancel all goroutines started by this executor and
  77. // wait until all goroutines exited
  78. func (executor *UnboundedExecutor) StopAndWaitForever() {
  79. executor.StopAndWait(context.Background())
  80. }
  81. // StopAndWait cancel all goroutines started by this executor and wait.
  82. // Wait can be cancelled by the context passed in.
  83. func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
  84. executor.cancel()
  85. for {
  86. fiveSeconds := time.NewTimer(time.Millisecond * 100)
  87. select {
  88. case <-fiveSeconds.C:
  89. case <-ctx.Done():
  90. return
  91. }
  92. if executor.checkGoroutines() {
  93. return
  94. }
  95. }
  96. }
  97. func (executor *UnboundedExecutor) checkGoroutines() bool {
  98. executor.activeGoroutinesMutex.Lock()
  99. defer executor.activeGoroutinesMutex.Unlock()
  100. for startFrom, count := range executor.activeGoroutines {
  101. if count > 0 {
  102. InfoLogger.Println("event!unbounded_executor.still waiting goroutines to quit",
  103. "startFrom", startFrom,
  104. "count", count)
  105. return false
  106. }
  107. }
  108. return true
  109. }