conn.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package pool
  2. import (
  3. "context"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. "github.com/go-redis/redis/internal/proto"
  8. )
  9. var noDeadline = time.Time{}
  10. type Conn struct {
  11. netConn net.Conn
  12. rd *proto.Reader
  13. wr *proto.Writer
  14. Inited bool
  15. pooled bool
  16. createdAt time.Time
  17. usedAt int64 // atomic
  18. }
  19. func NewConn(netConn net.Conn) *Conn {
  20. cn := &Conn{
  21. netConn: netConn,
  22. createdAt: time.Now(),
  23. }
  24. cn.rd = proto.NewReader(netConn)
  25. cn.wr = proto.NewWriter(netConn)
  26. cn.SetUsedAt(time.Now())
  27. return cn
  28. }
  29. func (cn *Conn) UsedAt() time.Time {
  30. unix := atomic.LoadInt64(&cn.usedAt)
  31. return time.Unix(unix, 0)
  32. }
  33. func (cn *Conn) SetUsedAt(tm time.Time) {
  34. atomic.StoreInt64(&cn.usedAt, tm.Unix())
  35. }
  36. func (cn *Conn) SetNetConn(netConn net.Conn) {
  37. cn.netConn = netConn
  38. cn.rd.Reset(netConn)
  39. cn.wr.Reset(netConn)
  40. }
  41. func (cn *Conn) Write(b []byte) (int, error) {
  42. return cn.netConn.Write(b)
  43. }
  44. func (cn *Conn) RemoteAddr() net.Addr {
  45. return cn.netConn.RemoteAddr()
  46. }
  47. func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
  48. err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout))
  49. if err != nil {
  50. return err
  51. }
  52. return fn(cn.rd)
  53. }
  54. func (cn *Conn) WithWriter(
  55. ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
  56. ) error {
  57. err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout))
  58. if err != nil {
  59. return err
  60. }
  61. if cn.wr.Buffered() > 0 {
  62. cn.wr.Reset(cn.netConn)
  63. }
  64. err = fn(cn.wr)
  65. if err != nil {
  66. return err
  67. }
  68. return cn.wr.Flush()
  69. }
  70. func (cn *Conn) Close() error {
  71. return cn.netConn.Close()
  72. }
  73. func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
  74. tm := time.Now()
  75. cn.SetUsedAt(tm)
  76. if timeout > 0 {
  77. tm = tm.Add(timeout)
  78. }
  79. if ctx != nil {
  80. deadline, ok := ctx.Deadline()
  81. if ok {
  82. if timeout == 0 {
  83. return deadline
  84. }
  85. if deadline.Before(tm) {
  86. return deadline
  87. }
  88. return tm
  89. }
  90. }
  91. if timeout > 0 {
  92. return tm
  93. }
  94. return noDeadline
  95. }