|
|
@@ -9,17 +9,23 @@ import (
|
|
|
"runtime/debug"
|
|
|
)
|
|
|
|
|
|
+// LogInfo logs informational message, for example
|
|
|
+// which goroutine is still alive preventing the executor shutdown
|
|
|
var LogInfo = func(event string, properties ...interface{}) {
|
|
|
}
|
|
|
|
|
|
+// LogPanic logs goroutine panic
|
|
|
var LogPanic = func(recovered interface{}, properties ...interface{}) interface{} {
|
|
|
fmt.Println(fmt.Sprintf("paniced: %v", recovered))
|
|
|
debug.PrintStack()
|
|
|
return recovered
|
|
|
}
|
|
|
|
|
|
+// StopSignal will not be recovered, will propagate to upper level goroutine
|
|
|
const StopSignal = "STOP!"
|
|
|
|
|
|
+// UnboundedExecutor is a executor without limits on counts of alive goroutines
|
|
|
+// it tracks the goroutine started by it, and can cancel them when shutdown
|
|
|
type UnboundedExecutor struct {
|
|
|
ctx context.Context
|
|
|
cancel context.CancelFunc
|
|
|
@@ -29,8 +35,12 @@ type UnboundedExecutor struct {
|
|
|
|
|
|
// GlobalUnboundedExecutor has the life cycle of the program itself
|
|
|
// any goroutine want to be shutdown before main exit can be started from this executor
|
|
|
+// GlobalUnboundedExecutor expects the main function to call stop
|
|
|
+// it does not magically knows the main function exits
|
|
|
var GlobalUnboundedExecutor = NewUnboundedExecutor()
|
|
|
|
|
|
+// NewUnboundedExecutor creates a new UnboundedExecutor,
|
|
|
+// UnboundedExecutor can not be created by &UnboundedExecutor{}
|
|
|
func NewUnboundedExecutor() *UnboundedExecutor {
|
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
|
return &UnboundedExecutor{
|
|
|
@@ -41,6 +51,8 @@ func NewUnboundedExecutor() *UnboundedExecutor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// Go starts a new goroutine and tracks its lifecycle.
|
|
|
+// Panic will be recovered and logged automatically, except for StopSignal
|
|
|
func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
|
|
|
_, file, line, _ := runtime.Caller(1)
|
|
|
executor.activeGoroutinesMutex.Lock()
|
|
|
@@ -61,14 +73,19 @@ func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
+// Stop cancel all goroutines started by this executor without wait
|
|
|
func (executor *UnboundedExecutor) Stop() {
|
|
|
executor.cancel()
|
|
|
}
|
|
|
|
|
|
+// Stop cancel all goroutines started by this executor and
|
|
|
+// wait until all goroutines exited
|
|
|
func (executor *UnboundedExecutor) StopAndWaitForever() {
|
|
|
executor.StopAndWait(context.Background())
|
|
|
}
|
|
|
|
|
|
+// Stop cancel all goroutines started by this executor and wait.
|
|
|
+// Wait can be cancelled by the context passed in.
|
|
|
func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
|
|
|
executor.cancel()
|
|
|
for {
|