redis.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. package redis
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/2637309949/dolphin/packages/redis/internal"
  8. "github.com/2637309949/dolphin/packages/redis/internal/pool"
  9. "github.com/2637309949/dolphin/packages/redis/internal/proto"
  10. )
  11. // Nil reply returned by Redis when key does not exist.
  12. const Nil = proto.Nil
  13. func SetLogger(logger *log.Logger) {
  14. internal.Logger = logger
  15. }
  16. //------------------------------------------------------------------------------
  17. type Hook interface {
  18. BeforeProcess(ctx context.Context, cmd Cmder) (context.Context, error)
  19. AfterProcess(ctx context.Context, cmd Cmder) error
  20. BeforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error)
  21. AfterProcessPipeline(ctx context.Context, cmds []Cmder) error
  22. }
  23. type hooks struct {
  24. hooks []Hook
  25. }
  26. func (hs hooks) Lock() {
  27. hs.hooks = hs.hooks[:len(hs.hooks):len(hs.hooks)]
  28. }
  29. func (hs *hooks) AddHook(hook Hook) {
  30. hs.hooks = append(hs.hooks, hook)
  31. }
  32. func (hs hooks) process(
  33. ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error,
  34. ) error {
  35. ctx, err := hs.beforeProcess(ctx, cmd)
  36. if err != nil {
  37. return err
  38. }
  39. cmdErr := fn(ctx, cmd)
  40. err = hs.afterProcess(ctx, cmd)
  41. if err != nil {
  42. return err
  43. }
  44. return cmdErr
  45. }
  46. func (hs hooks) beforeProcess(ctx context.Context, cmd Cmder) (context.Context, error) {
  47. for _, h := range hs.hooks {
  48. var err error
  49. ctx, err = h.BeforeProcess(ctx, cmd)
  50. if err != nil {
  51. return nil, err
  52. }
  53. }
  54. return ctx, nil
  55. }
  56. func (hs hooks) afterProcess(ctx context.Context, cmd Cmder) error {
  57. var firstErr error
  58. for _, h := range hs.hooks {
  59. err := h.AfterProcess(ctx, cmd)
  60. if err != nil && firstErr == nil {
  61. firstErr = err
  62. }
  63. }
  64. return firstErr
  65. }
  66. func (hs hooks) processPipeline(
  67. ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error,
  68. ) error {
  69. ctx, err := hs.beforeProcessPipeline(ctx, cmds)
  70. if err != nil {
  71. return err
  72. }
  73. cmdsErr := fn(ctx, cmds)
  74. err = hs.afterProcessPipeline(ctx, cmds)
  75. if err != nil {
  76. return err
  77. }
  78. return cmdsErr
  79. }
  80. func (hs hooks) beforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error) {
  81. for _, h := range hs.hooks {
  82. var err error
  83. ctx, err = h.BeforeProcessPipeline(ctx, cmds)
  84. if err != nil {
  85. return nil, err
  86. }
  87. }
  88. return ctx, nil
  89. }
  90. func (hs hooks) afterProcessPipeline(ctx context.Context, cmds []Cmder) error {
  91. var firstErr error
  92. for _, h := range hs.hooks {
  93. err := h.AfterProcessPipeline(ctx, cmds)
  94. if err != nil && firstErr == nil {
  95. firstErr = err
  96. }
  97. }
  98. return firstErr
  99. }
  100. //------------------------------------------------------------------------------
  101. type baseClient struct {
  102. opt *Options
  103. connPool pool.Pooler
  104. limiter Limiter
  105. onClose func() error // hook called when client is closed
  106. }
  107. func (c *baseClient) String() string {
  108. return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
  109. }
  110. func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
  111. cn, err := c.connPool.NewConn(ctx)
  112. if err != nil {
  113. return nil, err
  114. }
  115. err = c.initConn(ctx, cn)
  116. if err != nil {
  117. _ = c.connPool.CloseConn(cn)
  118. return nil, err
  119. }
  120. return cn, nil
  121. }
  122. func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
  123. if c.limiter != nil {
  124. err := c.limiter.Allow()
  125. if err != nil {
  126. return nil, err
  127. }
  128. }
  129. cn, err := c._getConn(ctx)
  130. if err != nil {
  131. if c.limiter != nil {
  132. c.limiter.ReportResult(err)
  133. }
  134. return nil, err
  135. }
  136. return cn, nil
  137. }
  138. func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
  139. cn, err := c.connPool.Get(ctx)
  140. if err != nil {
  141. return nil, err
  142. }
  143. err = c.initConn(ctx, cn)
  144. if err != nil {
  145. c.connPool.Remove(cn, err)
  146. if err := internal.Unwrap(err); err != nil {
  147. return nil, err
  148. }
  149. return nil, err
  150. }
  151. return cn, nil
  152. }
  153. func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
  154. if cn.Inited {
  155. return nil
  156. }
  157. cn.Inited = true
  158. if c.opt.Password == "" &&
  159. c.opt.DB == 0 &&
  160. !c.opt.readOnly &&
  161. c.opt.OnConnect == nil {
  162. return nil
  163. }
  164. connPool := pool.NewSingleConnPool(nil)
  165. connPool.SetConn(cn)
  166. conn := newConn(ctx, c.opt, connPool)
  167. _, err := conn.Pipelined(func(pipe Pipeliner) error {
  168. if c.opt.Password != "" {
  169. pipe.Auth(c.opt.Password)
  170. }
  171. if c.opt.DB > 0 {
  172. pipe.Select(c.opt.DB)
  173. }
  174. if c.opt.readOnly {
  175. pipe.ReadOnly()
  176. }
  177. return nil
  178. })
  179. if err != nil {
  180. return err
  181. }
  182. if c.opt.OnConnect != nil {
  183. return c.opt.OnConnect(conn)
  184. }
  185. return nil
  186. }
  187. func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
  188. if c.limiter != nil {
  189. c.limiter.ReportResult(err)
  190. }
  191. if isBadConn(err, false) {
  192. c.connPool.Remove(cn, err)
  193. } else {
  194. c.connPool.Put(cn)
  195. }
  196. }
  197. func (c *baseClient) withConn(
  198. ctx context.Context, fn func(context.Context, *pool.Conn) error,
  199. ) error {
  200. cn, err := c.getConn(ctx)
  201. if err != nil {
  202. return err
  203. }
  204. defer func() {
  205. c.releaseConn(cn, err)
  206. }()
  207. err = fn(ctx, cn)
  208. return err
  209. }
  210. func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
  211. err := c._process(ctx, cmd)
  212. if err != nil {
  213. cmd.setErr(err)
  214. return err
  215. }
  216. return nil
  217. }
  218. func (c *baseClient) _process(ctx context.Context, cmd Cmder) error {
  219. var lastErr error
  220. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  221. if attempt > 0 {
  222. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  223. return err
  224. }
  225. }
  226. retryTimeout := true
  227. lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  228. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  229. return writeCmd(wr, cmd)
  230. })
  231. if err != nil {
  232. return err
  233. }
  234. err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
  235. if err != nil {
  236. retryTimeout = cmd.readTimeout() == nil
  237. return err
  238. }
  239. return nil
  240. })
  241. if lastErr == nil || !isRetryableError(lastErr, retryTimeout) {
  242. return lastErr
  243. }
  244. }
  245. return lastErr
  246. }
  247. func (c *baseClient) retryBackoff(attempt int) time.Duration {
  248. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  249. }
  250. func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
  251. if timeout := cmd.readTimeout(); timeout != nil {
  252. t := *timeout
  253. if t == 0 {
  254. return 0
  255. }
  256. return t + 10*time.Second
  257. }
  258. return c.opt.ReadTimeout
  259. }
  260. // Close closes the client, releasing any open resources.
  261. //
  262. // It is rare to Close a Client, as the Client is meant to be
  263. // long-lived and shared between many goroutines.
  264. func (c *baseClient) Close() error {
  265. var firstErr error
  266. if c.onClose != nil {
  267. if err := c.onClose(); err != nil {
  268. firstErr = err
  269. }
  270. }
  271. if err := c.connPool.Close(); err != nil && firstErr == nil {
  272. firstErr = err
  273. }
  274. return firstErr
  275. }
  276. func (c *baseClient) getAddr() string {
  277. return c.opt.Addr
  278. }
  279. func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error {
  280. return c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds)
  281. }
  282. func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  283. return c.generalProcessPipeline(ctx, cmds, c.txPipelineProcessCmds)
  284. }
  285. type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
  286. func (c *baseClient) generalProcessPipeline(
  287. ctx context.Context, cmds []Cmder, p pipelineProcessor,
  288. ) error {
  289. err := c._generalProcessPipeline(ctx, cmds, p)
  290. if err != nil {
  291. setCmdsErr(cmds, err)
  292. return err
  293. }
  294. return cmdsFirstErr(cmds)
  295. }
  296. func (c *baseClient) _generalProcessPipeline(
  297. ctx context.Context, cmds []Cmder, p pipelineProcessor,
  298. ) error {
  299. var lastErr error
  300. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  301. if attempt > 0 {
  302. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  303. return err
  304. }
  305. }
  306. var canRetry bool
  307. lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  308. var err error
  309. canRetry, err = p(ctx, cn, cmds)
  310. return err
  311. })
  312. if lastErr == nil || !canRetry || !isRetryableError(lastErr, true) {
  313. return lastErr
  314. }
  315. }
  316. return lastErr
  317. }
  318. func (c *baseClient) pipelineProcessCmds(
  319. ctx context.Context, cn *pool.Conn, cmds []Cmder,
  320. ) (bool, error) {
  321. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  322. return writeCmd(wr, cmds...)
  323. })
  324. if err != nil {
  325. return true, err
  326. }
  327. err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  328. return pipelineReadCmds(rd, cmds)
  329. })
  330. return true, err
  331. }
  332. func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
  333. for _, cmd := range cmds {
  334. err := cmd.readReply(rd)
  335. if err != nil && !isRedisError(err) {
  336. return err
  337. }
  338. }
  339. return nil
  340. }
  341. func (c *baseClient) txPipelineProcessCmds(
  342. ctx context.Context, cn *pool.Conn, cmds []Cmder,
  343. ) (bool, error) {
  344. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  345. return txPipelineWriteMulti(wr, cmds)
  346. })
  347. if err != nil {
  348. return true, err
  349. }
  350. err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  351. err := txPipelineReadQueued(rd, cmds)
  352. if err != nil {
  353. return err
  354. }
  355. return pipelineReadCmds(rd, cmds)
  356. })
  357. return false, err
  358. }
  359. func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
  360. multiExec := make([]Cmder, 0, len(cmds)+2)
  361. multiExec = append(multiExec, NewStatusCmd("MULTI"))
  362. multiExec = append(multiExec, cmds...)
  363. multiExec = append(multiExec, NewSliceCmd("EXEC"))
  364. return writeCmd(wr, multiExec...)
  365. }
  366. func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
  367. // Parse queued replies.
  368. var statusCmd StatusCmd
  369. err := statusCmd.readReply(rd)
  370. if err != nil {
  371. return err
  372. }
  373. for range cmds {
  374. err = statusCmd.readReply(rd)
  375. if err != nil && !isRedisError(err) {
  376. return err
  377. }
  378. }
  379. // Parse number of replies.
  380. line, err := rd.ReadLine()
  381. if err != nil {
  382. if err == Nil {
  383. err = TxFailedErr
  384. }
  385. return err
  386. }
  387. switch line[0] {
  388. case proto.ErrorReply:
  389. return proto.ParseErrorReply(line)
  390. case proto.ArrayReply:
  391. // ok
  392. default:
  393. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  394. return err
  395. }
  396. return nil
  397. }
  398. //------------------------------------------------------------------------------
  399. // Client is a Redis client representing a pool of zero or more
  400. // underlying connections. It's safe for concurrent use by multiple
  401. // goroutines.
  402. type Client struct {
  403. baseClient
  404. cmdable
  405. hooks
  406. ctx context.Context
  407. }
  408. // NewClient returns a client to the Redis Server specified by Options.
  409. func NewClient(opt *Options) *Client {
  410. opt.init()
  411. c := Client{
  412. baseClient: baseClient{
  413. opt: opt,
  414. connPool: newConnPool(opt),
  415. },
  416. ctx: context.Background(),
  417. }
  418. c.cmdable = c.Process
  419. return &c
  420. }
  421. func (c *Client) Context() context.Context {
  422. return c.ctx
  423. }
  424. func (c *Client) WithContext(ctx context.Context) *Client {
  425. if ctx == nil {
  426. panic("nil context")
  427. }
  428. clone := *c
  429. clone.cmdable = clone.Process
  430. clone.hooks.Lock()
  431. clone.ctx = ctx
  432. return &clone
  433. }
  434. func (c *Client) Conn() *Conn {
  435. return newConn(c.ctx, c.opt, pool.NewSingleConnPool(c.connPool))
  436. }
  437. // Do creates a Cmd from the args and processes the cmd.
  438. func (c *Client) Do(args ...interface{}) *Cmd {
  439. return c.DoContext(c.ctx, args...)
  440. }
  441. func (c *Client) DoContext(ctx context.Context, args ...interface{}) *Cmd {
  442. cmd := NewCmd(args...)
  443. _ = c.ProcessContext(ctx, cmd)
  444. return cmd
  445. }
  446. func (c *Client) Process(cmd Cmder) error {
  447. return c.ProcessContext(c.ctx, cmd)
  448. }
  449. func (c *Client) ProcessContext(ctx context.Context, cmd Cmder) error {
  450. return c.hooks.process(ctx, cmd, c.baseClient.process)
  451. }
  452. func (c *Client) processPipeline(ctx context.Context, cmds []Cmder) error {
  453. return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
  454. }
  455. func (c *Client) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  456. return c.hooks.processPipeline(ctx, cmds, c.baseClient.processTxPipeline)
  457. }
  458. // Options returns read-only Options that were used to create the client.
  459. func (c *Client) Options() *Options {
  460. return c.opt
  461. }
  462. func (c *Client) SetLimiter(l Limiter) *Client {
  463. c.limiter = l
  464. return c
  465. }
  466. type PoolStats pool.Stats
  467. // PoolStats returns connection pool stats.
  468. func (c *Client) PoolStats() *PoolStats {
  469. stats := c.connPool.Stats()
  470. return (*PoolStats)(stats)
  471. }
  472. func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  473. return c.Pipeline().Pipelined(fn)
  474. }
  475. func (c *Client) Pipeline() Pipeliner {
  476. pipe := Pipeline{
  477. ctx: c.ctx,
  478. exec: c.processPipeline,
  479. }
  480. pipe.init()
  481. return &pipe
  482. }
  483. func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  484. return c.TxPipeline().Pipelined(fn)
  485. }
  486. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  487. func (c *Client) TxPipeline() Pipeliner {
  488. pipe := Pipeline{
  489. ctx: c.ctx,
  490. exec: c.processTxPipeline,
  491. }
  492. pipe.init()
  493. return &pipe
  494. }
  495. func (c *Client) pubSub() *PubSub {
  496. pubsub := &PubSub{
  497. opt: c.opt,
  498. newConn: func(channels []string) (*pool.Conn, error) {
  499. return c.newConn(context.TODO())
  500. },
  501. closeConn: c.connPool.CloseConn,
  502. }
  503. pubsub.init()
  504. return pubsub
  505. }
  506. // Subscribe subscribes the client to the specified channels.
  507. // Channels can be omitted to create empty subscription.
  508. // Note that this method does not wait on a response from Redis, so the
  509. // subscription may not be active immediately. To force the connection to wait,
  510. // you may call the Receive() method on the returned *PubSub like so:
  511. //
  512. // sub := client.Subscribe(queryResp)
  513. // iface, err := sub.Receive()
  514. // if err != nil {
  515. // // handle error
  516. // }
  517. //
  518. // // Should be *Subscription, but others are possible if other actions have been
  519. // // taken on sub since it was created.
  520. // switch iface.(type) {
  521. // case *Subscription:
  522. // // subscribe succeeded
  523. // case *Message:
  524. // // received first message
  525. // case *Pong:
  526. // // pong received
  527. // default:
  528. // // handle error
  529. // }
  530. //
  531. // ch := sub.Channel()
  532. func (c *Client) Subscribe(channels ...string) *PubSub {
  533. pubsub := c.pubSub()
  534. if len(channels) > 0 {
  535. _ = pubsub.Subscribe(channels...)
  536. }
  537. return pubsub
  538. }
  539. // PSubscribe subscribes the client to the given patterns.
  540. // Patterns can be omitted to create empty subscription.
  541. func (c *Client) PSubscribe(channels ...string) *PubSub {
  542. pubsub := c.pubSub()
  543. if len(channels) > 0 {
  544. _ = pubsub.PSubscribe(channels...)
  545. }
  546. return pubsub
  547. }
  548. //------------------------------------------------------------------------------
  549. type conn struct {
  550. baseClient
  551. cmdable
  552. statefulCmdable
  553. }
  554. // Conn is like Client, but its pool contains single connection.
  555. type Conn struct {
  556. *conn
  557. ctx context.Context
  558. }
  559. func newConn(ctx context.Context, opt *Options, connPool pool.Pooler) *Conn {
  560. c := Conn{
  561. conn: &conn{
  562. baseClient: baseClient{
  563. opt: opt,
  564. connPool: connPool,
  565. },
  566. },
  567. ctx: ctx,
  568. }
  569. c.cmdable = c.Process
  570. c.statefulCmdable = c.Process
  571. return &c
  572. }
  573. func (c *Conn) Process(cmd Cmder) error {
  574. return c.ProcessContext(c.ctx, cmd)
  575. }
  576. func (c *Conn) ProcessContext(ctx context.Context, cmd Cmder) error {
  577. return c.baseClient.process(ctx, cmd)
  578. }
  579. func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  580. return c.Pipeline().Pipelined(fn)
  581. }
  582. func (c *Conn) Pipeline() Pipeliner {
  583. pipe := Pipeline{
  584. ctx: c.ctx,
  585. exec: c.processPipeline,
  586. }
  587. pipe.init()
  588. return &pipe
  589. }
  590. func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  591. return c.TxPipeline().Pipelined(fn)
  592. }
  593. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  594. func (c *Conn) TxPipeline() Pipeliner {
  595. pipe := Pipeline{
  596. ctx: c.ctx,
  597. exec: c.processTxPipeline,
  598. }
  599. pipe.init()
  600. return &pipe
  601. }