store.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. package kv
  2. import (
  3. "errors"
  4. "log"
  5. "github.com/tal-tech/go-zero/core/errorx"
  6. "github.com/tal-tech/go-zero/core/hash"
  7. "github.com/tal-tech/go-zero/core/stores/cache"
  8. "github.com/tal-tech/go-zero/core/stores/redis"
  9. )
  10. // ErrNoRedisNode is an error that indicates no redis node.
  11. var ErrNoRedisNode = errors.New("no redis node")
  12. type (
  13. // Store interface represents a KV store.
  14. Store interface {
  15. Del(keys ...string) (int, error)
  16. Eval(script string, key string, args ...interface{}) (interface{}, error)
  17. Exists(key string) (bool, error)
  18. Expire(key string, seconds int) error
  19. Expireat(key string, expireTime int64) error
  20. Get(key string) (string, error)
  21. Hdel(key, field string) (bool, error)
  22. Hexists(key, field string) (bool, error)
  23. Hget(key, field string) (string, error)
  24. Hgetall(key string) (map[string]string, error)
  25. Hincrby(key, field string, increment int) (int, error)
  26. Hkeys(key string) ([]string, error)
  27. Hlen(key string) (int, error)
  28. Hmget(key string, fields ...string) ([]string, error)
  29. Hset(key, field, value string) error
  30. Hsetnx(key, field, value string) (bool, error)
  31. Hmset(key string, fieldsAndValues map[string]string) error
  32. Hvals(key string) ([]string, error)
  33. Incr(key string) (int64, error)
  34. Incrby(key string, increment int64) (int64, error)
  35. Llen(key string) (int, error)
  36. Lpop(key string) (string, error)
  37. Lpush(key string, values ...interface{}) (int, error)
  38. Lrange(key string, start int, stop int) ([]string, error)
  39. Lrem(key string, count int, value string) (int, error)
  40. Persist(key string) (bool, error)
  41. Pfadd(key string, values ...interface{}) (bool, error)
  42. Pfcount(key string) (int64, error)
  43. Rpush(key string, values ...interface{}) (int, error)
  44. Sadd(key string, values ...interface{}) (int, error)
  45. Scard(key string) (int64, error)
  46. Set(key string, value string) error
  47. Setex(key, value string, seconds int) error
  48. Setnx(key, value string) (bool, error)
  49. SetnxEx(key, value string, seconds int) (bool, error)
  50. Sismember(key string, value interface{}) (bool, error)
  51. Smembers(key string) ([]string, error)
  52. Spop(key string) (string, error)
  53. Srandmember(key string, count int) ([]string, error)
  54. Srem(key string, values ...interface{}) (int, error)
  55. Sscan(key string, cursor uint64, match string, count int64) (keys []string, cur uint64, err error)
  56. Ttl(key string) (int, error)
  57. Zadd(key string, score int64, value string) (bool, error)
  58. Zadds(key string, ps ...redis.Pair) (int64, error)
  59. Zcard(key string) (int, error)
  60. Zcount(key string, start, stop int64) (int, error)
  61. Zincrby(key string, increment int64, field string) (int64, error)
  62. Zrange(key string, start, stop int64) ([]string, error)
  63. ZrangeWithScores(key string, start, stop int64) ([]redis.Pair, error)
  64. ZrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error)
  65. ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ([]redis.Pair, error)
  66. Zrank(key, field string) (int64, error)
  67. Zrem(key string, values ...interface{}) (int, error)
  68. Zremrangebyrank(key string, start, stop int64) (int, error)
  69. Zremrangebyscore(key string, start, stop int64) (int, error)
  70. Zrevrange(key string, start, stop int64) ([]string, error)
  71. ZrevrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error)
  72. ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ([]redis.Pair, error)
  73. Zscore(key string, value string) (int64, error)
  74. Zrevrank(key, field string) (int64, error)
  75. }
  76. clusterStore struct {
  77. dispatcher *hash.ConsistentHash
  78. }
  79. )
  80. // NewStore returns a Store.
  81. func NewStore(c KvConf) Store {
  82. if len(c) == 0 || cache.TotalWeights(c) <= 0 {
  83. log.Fatal("no cache nodes")
  84. }
  85. // even if only one node, we chose to use consistent hash,
  86. // because Store and redis.Redis has different methods.
  87. dispatcher := hash.NewConsistentHash()
  88. for _, node := range c {
  89. cn := node.NewRedis()
  90. dispatcher.AddWithWeight(cn, node.Weight)
  91. }
  92. return clusterStore{
  93. dispatcher: dispatcher,
  94. }
  95. }
  96. func (cs clusterStore) Del(keys ...string) (int, error) {
  97. var val int
  98. var be errorx.BatchError
  99. for _, key := range keys {
  100. node, e := cs.getRedis(key)
  101. if e != nil {
  102. be.Add(e)
  103. continue
  104. }
  105. if v, e := node.Del(key); e != nil {
  106. be.Add(e)
  107. } else {
  108. val += v
  109. }
  110. }
  111. return val, be.Err()
  112. }
  113. func (cs clusterStore) Eval(script string, key string, args ...interface{}) (interface{}, error) {
  114. node, err := cs.getRedis(key)
  115. if err != nil {
  116. return nil, err
  117. }
  118. return node.Eval(script, []string{key}, args...)
  119. }
  120. func (cs clusterStore) Exists(key string) (bool, error) {
  121. node, err := cs.getRedis(key)
  122. if err != nil {
  123. return false, err
  124. }
  125. return node.Exists(key)
  126. }
  127. func (cs clusterStore) Expire(key string, seconds int) error {
  128. node, err := cs.getRedis(key)
  129. if err != nil {
  130. return err
  131. }
  132. return node.Expire(key, seconds)
  133. }
  134. func (cs clusterStore) Expireat(key string, expireTime int64) error {
  135. node, err := cs.getRedis(key)
  136. if err != nil {
  137. return err
  138. }
  139. return node.Expireat(key, expireTime)
  140. }
  141. func (cs clusterStore) Get(key string) (string, error) {
  142. node, err := cs.getRedis(key)
  143. if err != nil {
  144. return "", err
  145. }
  146. return node.Get(key)
  147. }
  148. func (cs clusterStore) Hdel(key, field string) (bool, error) {
  149. node, err := cs.getRedis(key)
  150. if err != nil {
  151. return false, err
  152. }
  153. return node.Hdel(key, field)
  154. }
  155. func (cs clusterStore) Hexists(key, field string) (bool, error) {
  156. node, err := cs.getRedis(key)
  157. if err != nil {
  158. return false, err
  159. }
  160. return node.Hexists(key, field)
  161. }
  162. func (cs clusterStore) Hget(key, field string) (string, error) {
  163. node, err := cs.getRedis(key)
  164. if err != nil {
  165. return "", err
  166. }
  167. return node.Hget(key, field)
  168. }
  169. func (cs clusterStore) Hgetall(key string) (map[string]string, error) {
  170. node, err := cs.getRedis(key)
  171. if err != nil {
  172. return nil, err
  173. }
  174. return node.Hgetall(key)
  175. }
  176. func (cs clusterStore) Hincrby(key, field string, increment int) (int, error) {
  177. node, err := cs.getRedis(key)
  178. if err != nil {
  179. return 0, err
  180. }
  181. return node.Hincrby(key, field, increment)
  182. }
  183. func (cs clusterStore) Hkeys(key string) ([]string, error) {
  184. node, err := cs.getRedis(key)
  185. if err != nil {
  186. return nil, err
  187. }
  188. return node.Hkeys(key)
  189. }
  190. func (cs clusterStore) Hlen(key string) (int, error) {
  191. node, err := cs.getRedis(key)
  192. if err != nil {
  193. return 0, err
  194. }
  195. return node.Hlen(key)
  196. }
  197. func (cs clusterStore) Hmget(key string, fields ...string) ([]string, error) {
  198. node, err := cs.getRedis(key)
  199. if err != nil {
  200. return nil, err
  201. }
  202. return node.Hmget(key, fields...)
  203. }
  204. func (cs clusterStore) Hset(key, field, value string) error {
  205. node, err := cs.getRedis(key)
  206. if err != nil {
  207. return err
  208. }
  209. return node.Hset(key, field, value)
  210. }
  211. func (cs clusterStore) Hsetnx(key, field, value string) (bool, error) {
  212. node, err := cs.getRedis(key)
  213. if err != nil {
  214. return false, err
  215. }
  216. return node.Hsetnx(key, field, value)
  217. }
  218. func (cs clusterStore) Hmset(key string, fieldsAndValues map[string]string) error {
  219. node, err := cs.getRedis(key)
  220. if err != nil {
  221. return err
  222. }
  223. return node.Hmset(key, fieldsAndValues)
  224. }
  225. func (cs clusterStore) Hvals(key string) ([]string, error) {
  226. node, err := cs.getRedis(key)
  227. if err != nil {
  228. return nil, err
  229. }
  230. return node.Hvals(key)
  231. }
  232. func (cs clusterStore) Incr(key string) (int64, error) {
  233. node, err := cs.getRedis(key)
  234. if err != nil {
  235. return 0, err
  236. }
  237. return node.Incr(key)
  238. }
  239. func (cs clusterStore) Incrby(key string, increment int64) (int64, error) {
  240. node, err := cs.getRedis(key)
  241. if err != nil {
  242. return 0, err
  243. }
  244. return node.Incrby(key, increment)
  245. }
  246. func (cs clusterStore) Llen(key string) (int, error) {
  247. node, err := cs.getRedis(key)
  248. if err != nil {
  249. return 0, err
  250. }
  251. return node.Llen(key)
  252. }
  253. func (cs clusterStore) Lpop(key string) (string, error) {
  254. node, err := cs.getRedis(key)
  255. if err != nil {
  256. return "", err
  257. }
  258. return node.Lpop(key)
  259. }
  260. func (cs clusterStore) Lpush(key string, values ...interface{}) (int, error) {
  261. node, err := cs.getRedis(key)
  262. if err != nil {
  263. return 0, err
  264. }
  265. return node.Lpush(key, values...)
  266. }
  267. func (cs clusterStore) Lrange(key string, start int, stop int) ([]string, error) {
  268. node, err := cs.getRedis(key)
  269. if err != nil {
  270. return nil, err
  271. }
  272. return node.Lrange(key, start, stop)
  273. }
  274. func (cs clusterStore) Lrem(key string, count int, value string) (int, error) {
  275. node, err := cs.getRedis(key)
  276. if err != nil {
  277. return 0, err
  278. }
  279. return node.Lrem(key, count, value)
  280. }
  281. func (cs clusterStore) Persist(key string) (bool, error) {
  282. node, err := cs.getRedis(key)
  283. if err != nil {
  284. return false, err
  285. }
  286. return node.Persist(key)
  287. }
  288. func (cs clusterStore) Pfadd(key string, values ...interface{}) (bool, error) {
  289. node, err := cs.getRedis(key)
  290. if err != nil {
  291. return false, err
  292. }
  293. return node.Pfadd(key, values...)
  294. }
  295. func (cs clusterStore) Pfcount(key string) (int64, error) {
  296. node, err := cs.getRedis(key)
  297. if err != nil {
  298. return 0, err
  299. }
  300. return node.Pfcount(key)
  301. }
  302. func (cs clusterStore) Rpush(key string, values ...interface{}) (int, error) {
  303. node, err := cs.getRedis(key)
  304. if err != nil {
  305. return 0, err
  306. }
  307. return node.Rpush(key, values...)
  308. }
  309. func (cs clusterStore) Sadd(key string, values ...interface{}) (int, error) {
  310. node, err := cs.getRedis(key)
  311. if err != nil {
  312. return 0, err
  313. }
  314. return node.Sadd(key, values...)
  315. }
  316. func (cs clusterStore) Scard(key string) (int64, error) {
  317. node, err := cs.getRedis(key)
  318. if err != nil {
  319. return 0, err
  320. }
  321. return node.Scard(key)
  322. }
  323. func (cs clusterStore) Set(key string, value string) error {
  324. node, err := cs.getRedis(key)
  325. if err != nil {
  326. return err
  327. }
  328. return node.Set(key, value)
  329. }
  330. func (cs clusterStore) Setex(key, value string, seconds int) error {
  331. node, err := cs.getRedis(key)
  332. if err != nil {
  333. return err
  334. }
  335. return node.Setex(key, value, seconds)
  336. }
  337. func (cs clusterStore) Setnx(key, value string) (bool, error) {
  338. node, err := cs.getRedis(key)
  339. if err != nil {
  340. return false, err
  341. }
  342. return node.Setnx(key, value)
  343. }
  344. func (cs clusterStore) SetnxEx(key, value string, seconds int) (bool, error) {
  345. node, err := cs.getRedis(key)
  346. if err != nil {
  347. return false, err
  348. }
  349. return node.SetnxEx(key, value, seconds)
  350. }
  351. func (cs clusterStore) Sismember(key string, value interface{}) (bool, error) {
  352. node, err := cs.getRedis(key)
  353. if err != nil {
  354. return false, err
  355. }
  356. return node.Sismember(key, value)
  357. }
  358. func (cs clusterStore) Smembers(key string) ([]string, error) {
  359. node, err := cs.getRedis(key)
  360. if err != nil {
  361. return nil, err
  362. }
  363. return node.Smembers(key)
  364. }
  365. func (cs clusterStore) Spop(key string) (string, error) {
  366. node, err := cs.getRedis(key)
  367. if err != nil {
  368. return "", err
  369. }
  370. return node.Spop(key)
  371. }
  372. func (cs clusterStore) Srandmember(key string, count int) ([]string, error) {
  373. node, err := cs.getRedis(key)
  374. if err != nil {
  375. return nil, err
  376. }
  377. return node.Srandmember(key, count)
  378. }
  379. func (cs clusterStore) Srem(key string, values ...interface{}) (int, error) {
  380. node, err := cs.getRedis(key)
  381. if err != nil {
  382. return 0, err
  383. }
  384. return node.Srem(key, values...)
  385. }
  386. func (cs clusterStore) Sscan(key string, cursor uint64, match string, count int64) (
  387. keys []string, cur uint64, err error) {
  388. node, err := cs.getRedis(key)
  389. if err != nil {
  390. return nil, 0, err
  391. }
  392. return node.Sscan(key, cursor, match, count)
  393. }
  394. func (cs clusterStore) Ttl(key string) (int, error) {
  395. node, err := cs.getRedis(key)
  396. if err != nil {
  397. return 0, err
  398. }
  399. return node.Ttl(key)
  400. }
  401. func (cs clusterStore) Zadd(key string, score int64, value string) (bool, error) {
  402. node, err := cs.getRedis(key)
  403. if err != nil {
  404. return false, err
  405. }
  406. return node.Zadd(key, score, value)
  407. }
  408. func (cs clusterStore) Zadds(key string, ps ...redis.Pair) (int64, error) {
  409. node, err := cs.getRedis(key)
  410. if err != nil {
  411. return 0, err
  412. }
  413. return node.Zadds(key, ps...)
  414. }
  415. func (cs clusterStore) Zcard(key string) (int, error) {
  416. node, err := cs.getRedis(key)
  417. if err != nil {
  418. return 0, err
  419. }
  420. return node.Zcard(key)
  421. }
  422. func (cs clusterStore) Zcount(key string, start, stop int64) (int, error) {
  423. node, err := cs.getRedis(key)
  424. if err != nil {
  425. return 0, err
  426. }
  427. return node.Zcount(key, start, stop)
  428. }
  429. func (cs clusterStore) Zincrby(key string, increment int64, field string) (int64, error) {
  430. node, err := cs.getRedis(key)
  431. if err != nil {
  432. return 0, err
  433. }
  434. return node.Zincrby(key, increment, field)
  435. }
  436. func (cs clusterStore) Zrank(key, field string) (int64, error) {
  437. node, err := cs.getRedis(key)
  438. if err != nil {
  439. return 0, err
  440. }
  441. return node.Zrank(key, field)
  442. }
  443. func (cs clusterStore) Zrange(key string, start, stop int64) ([]string, error) {
  444. node, err := cs.getRedis(key)
  445. if err != nil {
  446. return nil, err
  447. }
  448. return node.Zrange(key, start, stop)
  449. }
  450. func (cs clusterStore) ZrangeWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  451. node, err := cs.getRedis(key)
  452. if err != nil {
  453. return nil, err
  454. }
  455. return node.ZrangeWithScores(key, start, stop)
  456. }
  457. func (cs clusterStore) ZrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  458. node, err := cs.getRedis(key)
  459. if err != nil {
  460. return nil, err
  461. }
  462. return node.ZrangebyscoreWithScores(key, start, stop)
  463. }
  464. func (cs clusterStore) ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
  465. []redis.Pair, error) {
  466. node, err := cs.getRedis(key)
  467. if err != nil {
  468. return nil, err
  469. }
  470. return node.ZrangebyscoreWithScoresAndLimit(key, start, stop, page, size)
  471. }
  472. func (cs clusterStore) Zrem(key string, values ...interface{}) (int, error) {
  473. node, err := cs.getRedis(key)
  474. if err != nil {
  475. return 0, err
  476. }
  477. return node.Zrem(key, values...)
  478. }
  479. func (cs clusterStore) Zremrangebyrank(key string, start, stop int64) (int, error) {
  480. node, err := cs.getRedis(key)
  481. if err != nil {
  482. return 0, err
  483. }
  484. return node.Zremrangebyrank(key, start, stop)
  485. }
  486. func (cs clusterStore) Zremrangebyscore(key string, start, stop int64) (int, error) {
  487. node, err := cs.getRedis(key)
  488. if err != nil {
  489. return 0, err
  490. }
  491. return node.Zremrangebyscore(key, start, stop)
  492. }
  493. func (cs clusterStore) Zrevrange(key string, start, stop int64) ([]string, error) {
  494. node, err := cs.getRedis(key)
  495. if err != nil {
  496. return nil, err
  497. }
  498. return node.Zrevrange(key, start, stop)
  499. }
  500. func (cs clusterStore) ZrevrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  501. node, err := cs.getRedis(key)
  502. if err != nil {
  503. return nil, err
  504. }
  505. return node.ZrevrangebyscoreWithScores(key, start, stop)
  506. }
  507. func (cs clusterStore) ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
  508. []redis.Pair, error) {
  509. node, err := cs.getRedis(key)
  510. if err != nil {
  511. return nil, err
  512. }
  513. return node.ZrevrangebyscoreWithScoresAndLimit(key, start, stop, page, size)
  514. }
  515. func (cs clusterStore) Zrevrank(key, field string) (int64, error) {
  516. node, err := cs.getRedis(key)
  517. if err != nil {
  518. return 0, err
  519. }
  520. return node.Zrevrank(key, field)
  521. }
  522. func (cs clusterStore) Zscore(key string, value string) (int64, error) {
  523. node, err := cs.getRedis(key)
  524. if err != nil {
  525. return 0, err
  526. }
  527. return node.Zscore(key, value)
  528. }
  529. func (cs clusterStore) getRedis(key string) (*redis.Redis, error) {
  530. val, ok := cs.dispatcher.Get(key)
  531. if !ok {
  532. return nil, ErrNoRedisNode
  533. }
  534. return val.(*redis.Redis), nil
  535. }