redis.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. package redis
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/go-redis/redis/internal"
  8. "github.com/go-redis/redis/internal/pool"
  9. "github.com/go-redis/redis/internal/proto"
  10. )
  11. // Nil reply returned by Redis when key does not exist.
  12. const Nil = proto.Nil
  13. func SetLogger(logger *log.Logger) {
  14. internal.Logger = logger
  15. }
  16. //------------------------------------------------------------------------------
  17. type Hook interface {
  18. BeforeProcess(ctx context.Context, cmd Cmder) (context.Context, error)
  19. AfterProcess(ctx context.Context, cmd Cmder) error
  20. BeforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error)
  21. AfterProcessPipeline(ctx context.Context, cmds []Cmder) error
  22. }
  23. type hooks struct {
  24. hooks []Hook
  25. }
  26. func (hs *hooks) Lock() {
  27. hs.hooks = hs.hooks[:len(hs.hooks):len(hs.hooks)]
  28. }
  29. func (hs hooks) Clone() hooks {
  30. clone := hs
  31. clone.Lock()
  32. return clone
  33. }
  34. func (hs *hooks) AddHook(hook Hook) {
  35. hs.hooks = append(hs.hooks, hook)
  36. }
  37. func (hs hooks) process(
  38. ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error,
  39. ) error {
  40. ctx, err := hs.beforeProcess(ctx, cmd)
  41. if err != nil {
  42. cmd.SetErr(err)
  43. return err
  44. }
  45. cmdErr := fn(ctx, cmd)
  46. if err := hs.afterProcess(ctx, cmd); err != nil {
  47. cmd.SetErr(err)
  48. return err
  49. }
  50. return cmdErr
  51. }
  52. func (hs hooks) beforeProcess(ctx context.Context, cmd Cmder) (context.Context, error) {
  53. for _, h := range hs.hooks {
  54. var err error
  55. ctx, err = h.BeforeProcess(ctx, cmd)
  56. if err != nil {
  57. return nil, err
  58. }
  59. }
  60. return ctx, nil
  61. }
  62. func (hs hooks) afterProcess(ctx context.Context, cmd Cmder) error {
  63. var firstErr error
  64. for _, h := range hs.hooks {
  65. err := h.AfterProcess(ctx, cmd)
  66. if err != nil && firstErr == nil {
  67. firstErr = err
  68. }
  69. }
  70. return firstErr
  71. }
  72. func (hs hooks) processPipeline(
  73. ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error,
  74. ) error {
  75. ctx, err := hs.beforeProcessPipeline(ctx, cmds)
  76. if err != nil {
  77. setCmdsErr(cmds, err)
  78. return err
  79. }
  80. cmdsErr := fn(ctx, cmds)
  81. if err := hs.afterProcessPipeline(ctx, cmds); err != nil {
  82. setCmdsErr(cmds, err)
  83. return err
  84. }
  85. return cmdsErr
  86. }
  87. func (hs hooks) beforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error) {
  88. for _, h := range hs.hooks {
  89. var err error
  90. ctx, err = h.BeforeProcessPipeline(ctx, cmds)
  91. if err != nil {
  92. return nil, err
  93. }
  94. }
  95. return ctx, nil
  96. }
  97. func (hs hooks) afterProcessPipeline(ctx context.Context, cmds []Cmder) error {
  98. var firstErr error
  99. for _, h := range hs.hooks {
  100. err := h.AfterProcessPipeline(ctx, cmds)
  101. if err != nil && firstErr == nil {
  102. firstErr = err
  103. }
  104. }
  105. return firstErr
  106. }
  107. //------------------------------------------------------------------------------
  108. type baseClient struct {
  109. opt *Options
  110. connPool pool.Pooler
  111. onClose func() error // hook called when client is closed
  112. }
  113. func newBaseClient(opt *Options, connPool pool.Pooler) *baseClient {
  114. return &baseClient{
  115. opt: opt,
  116. connPool: connPool,
  117. }
  118. }
  119. func (c *baseClient) clone() *baseClient {
  120. clone := *c
  121. return &clone
  122. }
  123. func (c *baseClient) withTimeout(timeout time.Duration) *baseClient {
  124. opt := c.opt.clone()
  125. opt.ReadTimeout = timeout
  126. opt.WriteTimeout = timeout
  127. clone := c.clone()
  128. clone.opt = opt
  129. return clone
  130. }
  131. func (c *baseClient) String() string {
  132. return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
  133. }
  134. func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
  135. cn, err := c.connPool.NewConn(ctx)
  136. if err != nil {
  137. return nil, err
  138. }
  139. err = c.initConn(ctx, cn)
  140. if err != nil {
  141. _ = c.connPool.CloseConn(cn)
  142. return nil, err
  143. }
  144. return cn, nil
  145. }
  146. func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
  147. if c.opt.Limiter != nil {
  148. err := c.opt.Limiter.Allow()
  149. if err != nil {
  150. return nil, err
  151. }
  152. }
  153. cn, err := c._getConn(ctx)
  154. if err != nil {
  155. if c.opt.Limiter != nil {
  156. c.opt.Limiter.ReportResult(err)
  157. }
  158. return nil, err
  159. }
  160. return cn, nil
  161. }
  162. func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
  163. cn, err := c.connPool.Get(ctx)
  164. if err != nil {
  165. return nil, err
  166. }
  167. err = c.initConn(ctx, cn)
  168. if err != nil {
  169. c.connPool.Remove(cn, err)
  170. if err := internal.Unwrap(err); err != nil {
  171. return nil, err
  172. }
  173. return nil, err
  174. }
  175. return cn, nil
  176. }
  177. func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
  178. if cn.Inited {
  179. return nil
  180. }
  181. cn.Inited = true
  182. if c.opt.Password == "" &&
  183. c.opt.DB == 0 &&
  184. !c.opt.readOnly &&
  185. c.opt.OnConnect == nil {
  186. return nil
  187. }
  188. connPool := pool.NewSingleConnPool(nil)
  189. connPool.SetConn(cn)
  190. conn := newConn(ctx, c.opt, connPool)
  191. _, err := conn.Pipelined(func(pipe Pipeliner) error {
  192. if c.opt.Password != "" {
  193. pipe.Auth(c.opt.Password)
  194. }
  195. if c.opt.DB > 0 {
  196. pipe.Select(c.opt.DB)
  197. }
  198. if c.opt.readOnly {
  199. pipe.ReadOnly()
  200. }
  201. return nil
  202. })
  203. if err != nil {
  204. return err
  205. }
  206. if c.opt.OnConnect != nil {
  207. return c.opt.OnConnect(conn)
  208. }
  209. return nil
  210. }
  211. func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
  212. if c.opt.Limiter != nil {
  213. c.opt.Limiter.ReportResult(err)
  214. }
  215. if isBadConn(err, false) {
  216. c.connPool.Remove(cn, err)
  217. } else {
  218. c.connPool.Put(cn)
  219. }
  220. }
  221. func (c *baseClient) withConn(
  222. ctx context.Context, fn func(context.Context, *pool.Conn) error,
  223. ) error {
  224. cn, err := c.getConn(ctx)
  225. if err != nil {
  226. return err
  227. }
  228. defer func() {
  229. c.releaseConn(cn, err)
  230. }()
  231. err = fn(ctx, cn)
  232. return err
  233. }
  234. func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
  235. err := c._process(ctx, cmd)
  236. if err != nil {
  237. cmd.SetErr(err)
  238. return err
  239. }
  240. return nil
  241. }
  242. func (c *baseClient) _process(ctx context.Context, cmd Cmder) error {
  243. var lastErr error
  244. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  245. if attempt > 0 {
  246. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  247. return err
  248. }
  249. }
  250. retryTimeout := true
  251. lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  252. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  253. return writeCmd(wr, cmd)
  254. })
  255. if err != nil {
  256. return err
  257. }
  258. err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
  259. if err != nil {
  260. retryTimeout = cmd.readTimeout() == nil
  261. return err
  262. }
  263. return nil
  264. })
  265. if lastErr == nil || !isRetryableError(lastErr, retryTimeout) {
  266. return lastErr
  267. }
  268. }
  269. return lastErr
  270. }
  271. func (c *baseClient) retryBackoff(attempt int) time.Duration {
  272. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  273. }
  274. func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
  275. if timeout := cmd.readTimeout(); timeout != nil {
  276. t := *timeout
  277. if t == 0 {
  278. return 0
  279. }
  280. return t + 10*time.Second
  281. }
  282. return c.opt.ReadTimeout
  283. }
  284. // Close closes the client, releasing any open resources.
  285. //
  286. // It is rare to Close a Client, as the Client is meant to be
  287. // long-lived and shared between many goroutines.
  288. func (c *baseClient) Close() error {
  289. var firstErr error
  290. if c.onClose != nil {
  291. if err := c.onClose(); err != nil {
  292. firstErr = err
  293. }
  294. }
  295. if err := c.connPool.Close(); err != nil && firstErr == nil {
  296. firstErr = err
  297. }
  298. return firstErr
  299. }
  300. func (c *baseClient) getAddr() string {
  301. return c.opt.Addr
  302. }
  303. func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error {
  304. return c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds)
  305. }
  306. func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  307. return c.generalProcessPipeline(ctx, cmds, c.txPipelineProcessCmds)
  308. }
  309. type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
  310. func (c *baseClient) generalProcessPipeline(
  311. ctx context.Context, cmds []Cmder, p pipelineProcessor,
  312. ) error {
  313. err := c._generalProcessPipeline(ctx, cmds, p)
  314. if err != nil {
  315. setCmdsErr(cmds, err)
  316. return err
  317. }
  318. return cmdsFirstErr(cmds)
  319. }
  320. func (c *baseClient) _generalProcessPipeline(
  321. ctx context.Context, cmds []Cmder, p pipelineProcessor,
  322. ) error {
  323. var lastErr error
  324. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  325. if attempt > 0 {
  326. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  327. return err
  328. }
  329. }
  330. var canRetry bool
  331. lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  332. var err error
  333. canRetry, err = p(ctx, cn, cmds)
  334. return err
  335. })
  336. if lastErr == nil || !canRetry || !isRetryableError(lastErr, true) {
  337. return lastErr
  338. }
  339. }
  340. return lastErr
  341. }
  342. func (c *baseClient) pipelineProcessCmds(
  343. ctx context.Context, cn *pool.Conn, cmds []Cmder,
  344. ) (bool, error) {
  345. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  346. return writeCmd(wr, cmds...)
  347. })
  348. if err != nil {
  349. return true, err
  350. }
  351. err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  352. return pipelineReadCmds(rd, cmds)
  353. })
  354. return true, err
  355. }
  356. func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
  357. for _, cmd := range cmds {
  358. err := cmd.readReply(rd)
  359. if err != nil && !isRedisError(err) {
  360. return err
  361. }
  362. }
  363. return nil
  364. }
  365. func (c *baseClient) txPipelineProcessCmds(
  366. ctx context.Context, cn *pool.Conn, cmds []Cmder,
  367. ) (bool, error) {
  368. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  369. return txPipelineWriteMulti(wr, cmds)
  370. })
  371. if err != nil {
  372. return true, err
  373. }
  374. err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  375. err := txPipelineReadQueued(rd, cmds)
  376. if err != nil {
  377. return err
  378. }
  379. return pipelineReadCmds(rd, cmds)
  380. })
  381. return false, err
  382. }
  383. func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
  384. multiExec := make([]Cmder, 0, len(cmds)+2)
  385. multiExec = append(multiExec, NewStatusCmd("MULTI"))
  386. multiExec = append(multiExec, cmds...)
  387. multiExec = append(multiExec, NewSliceCmd("EXEC"))
  388. return writeCmd(wr, multiExec...)
  389. }
  390. func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
  391. // Parse queued replies.
  392. var statusCmd StatusCmd
  393. err := statusCmd.readReply(rd)
  394. if err != nil {
  395. return err
  396. }
  397. for range cmds {
  398. err = statusCmd.readReply(rd)
  399. if err != nil && !isRedisError(err) {
  400. return err
  401. }
  402. }
  403. // Parse number of replies.
  404. line, err := rd.ReadLine()
  405. if err != nil {
  406. if err == Nil {
  407. err = TxFailedErr
  408. }
  409. return err
  410. }
  411. switch line[0] {
  412. case proto.ErrorReply:
  413. return proto.ParseErrorReply(line)
  414. case proto.ArrayReply:
  415. // ok
  416. default:
  417. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  418. return err
  419. }
  420. return nil
  421. }
  422. //------------------------------------------------------------------------------
  423. // Client is a Redis client representing a pool of zero or more
  424. // underlying connections. It's safe for concurrent use by multiple
  425. // goroutines.
  426. type Client struct {
  427. *baseClient
  428. cmdable
  429. hooks
  430. ctx context.Context
  431. }
  432. // NewClient returns a client to the Redis Server specified by Options.
  433. func NewClient(opt *Options) *Client {
  434. opt.init()
  435. c := Client{
  436. baseClient: newBaseClient(opt, newConnPool(opt)),
  437. ctx: context.Background(),
  438. }
  439. c.cmdable = c.Process
  440. return &c
  441. }
  442. func (c *Client) clone() *Client {
  443. clone := *c
  444. clone.cmdable = clone.Process
  445. clone.hooks.Lock()
  446. return &clone
  447. }
  448. func (c *Client) WithTimeout(timeout time.Duration) *Client {
  449. clone := c.clone()
  450. clone.baseClient = c.baseClient.withTimeout(timeout)
  451. return clone
  452. }
  453. func (c *Client) Context() context.Context {
  454. return c.ctx
  455. }
  456. func (c *Client) WithContext(ctx context.Context) *Client {
  457. if ctx == nil {
  458. panic("nil context")
  459. }
  460. clone := c.clone()
  461. clone.ctx = ctx
  462. return clone
  463. }
  464. func (c *Client) Conn() *Conn {
  465. return newConn(c.ctx, c.opt, pool.NewSingleConnPool(c.connPool))
  466. }
  467. // Do creates a Cmd from the args and processes the cmd.
  468. func (c *Client) Do(args ...interface{}) *Cmd {
  469. return c.DoContext(c.ctx, args...)
  470. }
  471. func (c *Client) DoContext(ctx context.Context, args ...interface{}) *Cmd {
  472. cmd := NewCmd(args...)
  473. _ = c.ProcessContext(ctx, cmd)
  474. return cmd
  475. }
  476. func (c *Client) Process(cmd Cmder) error {
  477. return c.ProcessContext(c.ctx, cmd)
  478. }
  479. func (c *Client) ProcessContext(ctx context.Context, cmd Cmder) error {
  480. return c.hooks.process(ctx, cmd, c.baseClient.process)
  481. }
  482. func (c *Client) processPipeline(ctx context.Context, cmds []Cmder) error {
  483. return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
  484. }
  485. func (c *Client) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  486. return c.hooks.processPipeline(ctx, cmds, c.baseClient.processTxPipeline)
  487. }
  488. // Options returns read-only Options that were used to create the client.
  489. func (c *Client) Options() *Options {
  490. return c.opt
  491. }
  492. type PoolStats pool.Stats
  493. // PoolStats returns connection pool stats.
  494. func (c *Client) PoolStats() *PoolStats {
  495. stats := c.connPool.Stats()
  496. return (*PoolStats)(stats)
  497. }
  498. func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  499. return c.Pipeline().Pipelined(fn)
  500. }
  501. func (c *Client) Pipeline() Pipeliner {
  502. pipe := Pipeline{
  503. ctx: c.ctx,
  504. exec: c.processPipeline,
  505. }
  506. pipe.init()
  507. return &pipe
  508. }
  509. func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  510. return c.TxPipeline().Pipelined(fn)
  511. }
  512. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  513. func (c *Client) TxPipeline() Pipeliner {
  514. pipe := Pipeline{
  515. ctx: c.ctx,
  516. exec: c.processTxPipeline,
  517. }
  518. pipe.init()
  519. return &pipe
  520. }
  521. func (c *Client) pubSub() *PubSub {
  522. pubsub := &PubSub{
  523. opt: c.opt,
  524. newConn: func(channels []string) (*pool.Conn, error) {
  525. return c.newConn(context.TODO())
  526. },
  527. closeConn: c.connPool.CloseConn,
  528. }
  529. pubsub.init()
  530. return pubsub
  531. }
  532. // Subscribe subscribes the client to the specified channels.
  533. // Channels can be omitted to create empty subscription.
  534. // Note that this method does not wait on a response from Redis, so the
  535. // subscription may not be active immediately. To force the connection to wait,
  536. // you may call the Receive() method on the returned *PubSub like so:
  537. //
  538. // sub := client.Subscribe(queryResp)
  539. // iface, err := sub.Receive()
  540. // if err != nil {
  541. // // handle error
  542. // }
  543. //
  544. // // Should be *Subscription, but others are possible if other actions have been
  545. // // taken on sub since it was created.
  546. // switch iface.(type) {
  547. // case *Subscription:
  548. // // subscribe succeeded
  549. // case *Message:
  550. // // received first message
  551. // case *Pong:
  552. // // pong received
  553. // default:
  554. // // handle error
  555. // }
  556. //
  557. // ch := sub.Channel()
  558. func (c *Client) Subscribe(channels ...string) *PubSub {
  559. pubsub := c.pubSub()
  560. if len(channels) > 0 {
  561. _ = pubsub.Subscribe(channels...)
  562. }
  563. return pubsub
  564. }
  565. // PSubscribe subscribes the client to the given patterns.
  566. // Patterns can be omitted to create empty subscription.
  567. func (c *Client) PSubscribe(channels ...string) *PubSub {
  568. pubsub := c.pubSub()
  569. if len(channels) > 0 {
  570. _ = pubsub.PSubscribe(channels...)
  571. }
  572. return pubsub
  573. }
  574. //------------------------------------------------------------------------------
  575. type conn struct {
  576. baseClient
  577. cmdable
  578. statefulCmdable
  579. }
  580. // Conn is like Client, but its pool contains single connection.
  581. type Conn struct {
  582. *conn
  583. ctx context.Context
  584. }
  585. func newConn(ctx context.Context, opt *Options, connPool pool.Pooler) *Conn {
  586. c := Conn{
  587. conn: &conn{
  588. baseClient: baseClient{
  589. opt: opt,
  590. connPool: connPool,
  591. },
  592. },
  593. ctx: ctx,
  594. }
  595. c.cmdable = c.Process
  596. c.statefulCmdable = c.Process
  597. return &c
  598. }
  599. func (c *Conn) Process(cmd Cmder) error {
  600. return c.ProcessContext(c.ctx, cmd)
  601. }
  602. func (c *Conn) ProcessContext(ctx context.Context, cmd Cmder) error {
  603. return c.baseClient.process(ctx, cmd)
  604. }
  605. func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  606. return c.Pipeline().Pipelined(fn)
  607. }
  608. func (c *Conn) Pipeline() Pipeliner {
  609. pipe := Pipeline{
  610. ctx: c.ctx,
  611. exec: c.processPipeline,
  612. }
  613. pipe.init()
  614. return &pipe
  615. }
  616. func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  617. return c.TxPipeline().Pipelined(fn)
  618. }
  619. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  620. func (c *Conn) TxPipeline() Pipeliner {
  621. pipe := Pipeline{
  622. ctx: c.ctx,
  623. exec: c.processTxPipeline,
  624. }
  625. pipe.init()
  626. return &pipe
  627. }