|
|
@@ -7,17 +7,13 @@ import (
|
|
|
"runtime/debug"
|
|
|
"sync"
|
|
|
"time"
|
|
|
+ "reflect"
|
|
|
)
|
|
|
|
|
|
-// LogInfo logs informational message, for example
|
|
|
-// which goroutine is still alive preventing the executor shutdown
|
|
|
-var LogInfo = func(event string, properties ...interface{}) {
|
|
|
-}
|
|
|
-
|
|
|
// HandlePanic logs goroutine panic by default
|
|
|
-var HandlePanic = func(recovered interface{}, file string, line int) {
|
|
|
- fmt.Println(fmt.Sprintf("paniced: %v", recovered))
|
|
|
- debug.PrintStack()
|
|
|
+var HandlePanic = func(recovered interface{}, file string, line int, funcName string) {
|
|
|
+ ErrorLogger.Println(fmt.Sprintf("%s defined at %s:%v panic: %v", funcName, file, line, recovered))
|
|
|
+ ErrorLogger.Println(string(debug.Stack()))
|
|
|
}
|
|
|
|
|
|
// StopSignal will not be recovered, will propagate to upper level goroutine
|
|
|
@@ -30,6 +26,7 @@ type UnboundedExecutor struct {
|
|
|
cancel context.CancelFunc
|
|
|
activeGoroutinesMutex *sync.Mutex
|
|
|
activeGoroutines map[string]int
|
|
|
+ HandlePanic func(recovered interface{}, file string, line int, funcName string)
|
|
|
}
|
|
|
|
|
|
// GlobalUnboundedExecutor has the life cycle of the program itself
|
|
|
@@ -40,6 +37,7 @@ var GlobalUnboundedExecutor = NewUnboundedExecutor()
|
|
|
|
|
|
// NewUnboundedExecutor creates a new UnboundedExecutor,
|
|
|
// UnboundedExecutor can not be created by &UnboundedExecutor{}
|
|
|
+// HandlePanic can be set with a callback to override global HandlePanic
|
|
|
func NewUnboundedExecutor() *UnboundedExecutor {
|
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
|
return &UnboundedExecutor{
|
|
|
@@ -53,7 +51,10 @@ func NewUnboundedExecutor() *UnboundedExecutor {
|
|
|
// Go starts a new goroutine and tracks its lifecycle.
|
|
|
// Panic will be recovered and logged automatically, except for StopSignal
|
|
|
func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
|
|
|
- _, file, line, _ := runtime.Caller(1)
|
|
|
+ pc := reflect.ValueOf(handler).Pointer()
|
|
|
+ f := runtime.FuncForPC(pc)
|
|
|
+ funcName := f.Name()
|
|
|
+ file, line := f.FileLine(pc)
|
|
|
executor.activeGoroutinesMutex.Lock()
|
|
|
defer executor.activeGoroutinesMutex.Unlock()
|
|
|
startFrom := fmt.Sprintf("%s:%d", file, line)
|
|
|
@@ -62,7 +63,11 @@ func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
|
|
|
defer func() {
|
|
|
recovered := recover()
|
|
|
if recovered != nil && recovered != StopSignal {
|
|
|
- HandlePanic(recovered, file, line)
|
|
|
+ if executor.HandlePanic == nil {
|
|
|
+ HandlePanic(recovered, file, line, funcName)
|
|
|
+ } else {
|
|
|
+ executor.HandlePanic(recovered, file, line, funcName)
|
|
|
+ }
|
|
|
}
|
|
|
executor.activeGoroutinesMutex.Lock()
|
|
|
defer executor.activeGoroutinesMutex.Unlock()
|
|
|
@@ -105,7 +110,7 @@ func (executor *UnboundedExecutor) checkGoroutines() bool {
|
|
|
defer executor.activeGoroutinesMutex.Unlock()
|
|
|
for startFrom, count := range executor.activeGoroutines {
|
|
|
if count > 0 {
|
|
|
- LogInfo("event!unbounded_executor.still waiting goroutines to quit",
|
|
|
+ InfoLogger.Println("event!unbounded_executor.still waiting goroutines to quit",
|
|
|
"startFrom", startFrom,
|
|
|
"count", count)
|
|
|
return false
|