sessionmanager.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package mongo
  2. import (
  3. "io"
  4. "time"
  5. "github.com/globalsign/mgo"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. "github.com/tal-tech/go-zero/core/syncx"
  8. )
  9. const (
  10. defaultConcurrency = 50
  11. defaultTimeout = time.Second
  12. )
  13. var sessionManager = syncx.NewResourceManager()
  14. type concurrentSession struct {
  15. *mgo.Session
  16. limit syncx.TimeoutLimit
  17. }
  18. func (cs *concurrentSession) Close() error {
  19. cs.Session.Close()
  20. return nil
  21. }
  22. func getConcurrentSession(url string) (*concurrentSession, error) {
  23. val, err := sessionManager.GetResource(url, func() (io.Closer, error) {
  24. mgoSession, err := mgo.Dial(url)
  25. if err != nil {
  26. return nil, err
  27. }
  28. concurrentSess := &concurrentSession{
  29. Session: mgoSession,
  30. limit: syncx.NewTimeoutLimit(defaultConcurrency),
  31. }
  32. return concurrentSess, nil
  33. })
  34. if err != nil {
  35. return nil, err
  36. }
  37. return val.(*concurrentSession), nil
  38. }
  39. func (cs *concurrentSession) putSession(session *mgo.Session) {
  40. if err := cs.limit.Return(); err != nil {
  41. logx.Error(err)
  42. }
  43. // anyway, we need to close the session
  44. session.Close()
  45. }
  46. func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
  47. o := &options{
  48. timeout: defaultTimeout,
  49. }
  50. for _, opt := range opts {
  51. opt(o)
  52. }
  53. if err := cs.limit.Borrow(o.timeout); err != nil {
  54. return nil, err
  55. }
  56. return cs.Copy(), nil
  57. }