unbounded_executor.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package concurrent
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. "sync"
  8. "time"
  9. )
  10. // LogInfo logs informational message, for example
  11. // which goroutine is still alive preventing the executor shutdown
  12. var LogInfo = func(event string, properties ...interface{}) {
  13. }
  14. // LogPanic logs goroutine panic
  15. var LogPanic = func(recovered interface{}, properties ...interface{}) interface{} {
  16. fmt.Println(fmt.Sprintf("paniced: %v", recovered))
  17. debug.PrintStack()
  18. return recovered
  19. }
  20. // StopSignal will not be recovered, will propagate to upper level goroutine
  21. const StopSignal = "STOP!"
  22. // UnboundedExecutor is a executor without limits on counts of alive goroutines
  23. // it tracks the goroutine started by it, and can cancel them when shutdown
  24. type UnboundedExecutor struct {
  25. ctx context.Context
  26. cancel context.CancelFunc
  27. activeGoroutinesMutex *sync.Mutex
  28. activeGoroutines map[string]int
  29. }
  30. // GlobalUnboundedExecutor has the life cycle of the program itself
  31. // any goroutine want to be shutdown before main exit can be started from this executor
  32. // GlobalUnboundedExecutor expects the main function to call stop
  33. // it does not magically knows the main function exits
  34. var GlobalUnboundedExecutor = NewUnboundedExecutor()
  35. // NewUnboundedExecutor creates a new UnboundedExecutor,
  36. // UnboundedExecutor can not be created by &UnboundedExecutor{}
  37. func NewUnboundedExecutor() *UnboundedExecutor {
  38. ctx, cancel := context.WithCancel(context.TODO())
  39. return &UnboundedExecutor{
  40. ctx: ctx,
  41. cancel: cancel,
  42. activeGoroutinesMutex: &sync.Mutex{},
  43. activeGoroutines: map[string]int{},
  44. }
  45. }
  46. // Go starts a new goroutine and tracks its lifecycle.
  47. // Panic will be recovered and logged automatically, except for StopSignal
  48. func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
  49. _, file, line, _ := runtime.Caller(1)
  50. executor.activeGoroutinesMutex.Lock()
  51. defer executor.activeGoroutinesMutex.Unlock()
  52. startFrom := fmt.Sprintf("%s:%d", file, line)
  53. executor.activeGoroutines[startFrom] += 1
  54. go func() {
  55. defer func() {
  56. recovered := recover()
  57. if recovered != nil && recovered != StopSignal {
  58. LogPanic(recovered)
  59. }
  60. executor.activeGoroutinesMutex.Lock()
  61. defer executor.activeGoroutinesMutex.Unlock()
  62. executor.activeGoroutines[startFrom] -= 1
  63. }()
  64. handler(executor.ctx)
  65. }()
  66. }
  67. // Stop cancel all goroutines started by this executor without wait
  68. func (executor *UnboundedExecutor) Stop() {
  69. executor.cancel()
  70. }
  71. // StopAndWaitForever cancel all goroutines started by this executor and
  72. // wait until all goroutines exited
  73. func (executor *UnboundedExecutor) StopAndWaitForever() {
  74. executor.StopAndWait(context.Background())
  75. }
  76. // StopAndWait cancel all goroutines started by this executor and wait.
  77. // Wait can be cancelled by the context passed in.
  78. func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
  79. executor.cancel()
  80. for {
  81. fiveSeconds := time.NewTimer(time.Millisecond * 100)
  82. select {
  83. case <-fiveSeconds.C:
  84. case <-ctx.Done():
  85. return
  86. }
  87. if executor.checkGoroutines() {
  88. return
  89. }
  90. }
  91. }
  92. func (executor *UnboundedExecutor) checkGoroutines() bool {
  93. executor.activeGoroutinesMutex.Lock()
  94. defer executor.activeGoroutinesMutex.Unlock()
  95. for startFrom, count := range executor.activeGoroutines {
  96. if count > 0 {
  97. LogInfo("event!unbounded_executor.still waiting goroutines to quit",
  98. "startFrom", startFrom,
  99. "count", count)
  100. return false
  101. }
  102. }
  103. return true
  104. }