redisblockingnode.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package redis
  2. import (
  3. "fmt"
  4. red "github.com/go-redis/redis"
  5. "github.com/tal-tech/go-zero/core/logx"
  6. )
  7. // ClosableNode interface represents a closable redis node.
  8. type ClosableNode interface {
  9. RedisNode
  10. Close()
  11. }
  12. // CreateBlockingNode returns a ClosableNode.
  13. func CreateBlockingNode(r *Redis) (ClosableNode, error) {
  14. timeout := readWriteTimeout + blockingQueryTimeout
  15. switch r.Type {
  16. case NodeType:
  17. client := red.NewClient(&red.Options{
  18. Addr: r.Addr,
  19. Password: r.Pass,
  20. DB: defaultDatabase,
  21. MaxRetries: maxRetries,
  22. PoolSize: 1,
  23. MinIdleConns: 1,
  24. ReadTimeout: timeout,
  25. })
  26. return &clientBridge{client}, nil
  27. case ClusterType:
  28. client := red.NewClusterClient(&red.ClusterOptions{
  29. Addrs: []string{r.Addr},
  30. Password: r.Pass,
  31. MaxRetries: maxRetries,
  32. PoolSize: 1,
  33. MinIdleConns: 1,
  34. ReadTimeout: timeout,
  35. })
  36. return &clusterBridge{client}, nil
  37. default:
  38. return nil, fmt.Errorf("unknown redis type: %s", r.Type)
  39. }
  40. }
  41. type (
  42. clientBridge struct {
  43. *red.Client
  44. }
  45. clusterBridge struct {
  46. *red.ClusterClient
  47. }
  48. )
  49. func (bridge *clientBridge) Close() {
  50. if err := bridge.Client.Close(); err != nil {
  51. logx.Errorf("Error occurred on close redis client: %s", err)
  52. }
  53. }
  54. func (bridge *clusterBridge) Close() {
  55. if err := bridge.ClusterClient.Close(); err != nil {
  56. logx.Errorf("Error occurred on close redis cluster: %s", err)
  57. }
  58. }