leak.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "runtime"
  7. "runtime/pprof"
  8. "time"
  9. "github.com/tal-tech/go-zero/core/lang"
  10. "github.com/tal-tech/go-zero/core/logx"
  11. "github.com/tal-tech/go-zero/core/mr"
  12. "github.com/tal-tech/go-zero/core/proc"
  13. )
  14. func dumpGoroutines() {
  15. dumpFile := "goroutines.dump"
  16. logx.Infof("Got dump goroutine signal, printing goroutine profile to %s", dumpFile)
  17. if f, err := os.Create(dumpFile); err != nil {
  18. logx.Errorf("Failed to dump goroutine profile, error: %v", err)
  19. } else {
  20. defer f.Close()
  21. pprof.Lookup("goroutine").WriteTo(f, 2)
  22. }
  23. }
  24. func main() {
  25. profiler := proc.StartProfile()
  26. defer profiler.Stop()
  27. done := make(chan lang.PlaceholderType)
  28. go func() {
  29. for {
  30. time.Sleep(time.Second)
  31. fmt.Println(runtime.NumGoroutine())
  32. }
  33. }()
  34. go func() {
  35. time.Sleep(time.Minute)
  36. dumpGoroutines()
  37. close(done)
  38. }()
  39. for {
  40. select {
  41. case <-done:
  42. return
  43. default:
  44. mr.MapReduce(func(source chan<- interface{}) {
  45. for i := 0; i < 100; i++ {
  46. source <- i
  47. }
  48. }, func(item interface{}, writer mr.Writer, cancel func(error)) {
  49. if item.(int) == 40 {
  50. cancel(errors.New("any"))
  51. return
  52. }
  53. writer.Write(item)
  54. }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
  55. list := make([]int, 0)
  56. for p := range pipe {
  57. list = append(list, p.(int))
  58. }
  59. writer.Write(list)
  60. })
  61. }
  62. }
  63. }