reader.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package proto
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "github.com/go-redis/redis/internal/util"
  7. )
  8. const (
  9. ErrorReply = '-'
  10. StatusReply = '+'
  11. IntReply = ':'
  12. StringReply = '$'
  13. ArrayReply = '*'
  14. )
  15. //------------------------------------------------------------------------------
  16. const Nil = RedisError("redis: nil")
  17. type RedisError string
  18. func (e RedisError) Error() string { return string(e) }
  19. //------------------------------------------------------------------------------
  20. type MultiBulkParse func(*Reader, int64) (interface{}, error)
  21. type Reader struct {
  22. rd *bufio.Reader
  23. _buf []byte
  24. }
  25. func NewReader(rd io.Reader) *Reader {
  26. return &Reader{
  27. rd: bufio.NewReader(rd),
  28. _buf: make([]byte, 64),
  29. }
  30. }
  31. func (r *Reader) Buffered() int {
  32. return r.rd.Buffered()
  33. }
  34. func (r *Reader) Peek(n int) ([]byte, error) {
  35. return r.rd.Peek(n)
  36. }
  37. func (r *Reader) Reset(rd io.Reader) {
  38. r.rd.Reset(rd)
  39. }
  40. func (r *Reader) ReadLine() ([]byte, error) {
  41. line, err := r.readLine()
  42. if err != nil {
  43. return nil, err
  44. }
  45. if isNilReply(line) {
  46. return nil, Nil
  47. }
  48. return line, nil
  49. }
  50. // readLine that returns an error if:
  51. // - there is a pending read error;
  52. // - or line does not end with \r\n.
  53. func (r *Reader) readLine() ([]byte, error) {
  54. b, err := r.rd.ReadSlice('\n')
  55. if err != nil {
  56. return nil, err
  57. }
  58. if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
  59. return nil, fmt.Errorf("redis: invalid reply: %q", b)
  60. }
  61. b = b[:len(b)-2]
  62. return b, nil
  63. }
  64. func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
  65. line, err := r.ReadLine()
  66. if err != nil {
  67. return nil, err
  68. }
  69. switch line[0] {
  70. case ErrorReply:
  71. return nil, ParseErrorReply(line)
  72. case StatusReply:
  73. return string(line[1:]), nil
  74. case IntReply:
  75. return util.ParseInt(line[1:], 10, 64)
  76. case StringReply:
  77. return r.readStringReply(line)
  78. case ArrayReply:
  79. n, err := parseArrayLen(line)
  80. if err != nil {
  81. return nil, err
  82. }
  83. if m == nil {
  84. err := fmt.Errorf("redis: got %.100q, but multi bulk parser is nil", line)
  85. return nil, err
  86. }
  87. return m(r, n)
  88. }
  89. return nil, fmt.Errorf("redis: can't parse %.100q", line)
  90. }
  91. func (r *Reader) ReadIntReply() (int64, error) {
  92. line, err := r.ReadLine()
  93. if err != nil {
  94. return 0, err
  95. }
  96. switch line[0] {
  97. case ErrorReply:
  98. return 0, ParseErrorReply(line)
  99. case IntReply:
  100. return util.ParseInt(line[1:], 10, 64)
  101. default:
  102. return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
  103. }
  104. }
  105. func (r *Reader) ReadString() (string, error) {
  106. line, err := r.ReadLine()
  107. if err != nil {
  108. return "", err
  109. }
  110. switch line[0] {
  111. case ErrorReply:
  112. return "", ParseErrorReply(line)
  113. case StringReply:
  114. return r.readStringReply(line)
  115. case StatusReply:
  116. return string(line[1:]), nil
  117. case IntReply:
  118. return string(line[1:]), nil
  119. default:
  120. return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
  121. }
  122. }
  123. func (r *Reader) readStringReply(line []byte) (string, error) {
  124. if isNilReply(line) {
  125. return "", Nil
  126. }
  127. replyLen, err := util.Atoi(line[1:])
  128. if err != nil {
  129. return "", err
  130. }
  131. b := make([]byte, replyLen+2)
  132. _, err = io.ReadFull(r.rd, b)
  133. if err != nil {
  134. return "", err
  135. }
  136. return util.BytesToString(b[:replyLen]), nil
  137. }
  138. func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
  139. line, err := r.ReadLine()
  140. if err != nil {
  141. return nil, err
  142. }
  143. switch line[0] {
  144. case ErrorReply:
  145. return nil, ParseErrorReply(line)
  146. case ArrayReply:
  147. n, err := parseArrayLen(line)
  148. if err != nil {
  149. return nil, err
  150. }
  151. return m(r, n)
  152. default:
  153. return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
  154. }
  155. }
  156. func (r *Reader) ReadArrayLen() (int64, error) {
  157. line, err := r.ReadLine()
  158. if err != nil {
  159. return 0, err
  160. }
  161. switch line[0] {
  162. case ErrorReply:
  163. return 0, ParseErrorReply(line)
  164. case ArrayReply:
  165. return parseArrayLen(line)
  166. default:
  167. return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
  168. }
  169. }
  170. func (r *Reader) ReadScanReply() ([]string, uint64, error) {
  171. n, err := r.ReadArrayLen()
  172. if err != nil {
  173. return nil, 0, err
  174. }
  175. if n != 2 {
  176. return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
  177. }
  178. cursor, err := r.ReadUint()
  179. if err != nil {
  180. return nil, 0, err
  181. }
  182. n, err = r.ReadArrayLen()
  183. if err != nil {
  184. return nil, 0, err
  185. }
  186. keys := make([]string, n)
  187. for i := int64(0); i < n; i++ {
  188. key, err := r.ReadString()
  189. if err != nil {
  190. return nil, 0, err
  191. }
  192. keys[i] = key
  193. }
  194. return keys, cursor, err
  195. }
  196. func (r *Reader) ReadInt() (int64, error) {
  197. b, err := r.readTmpBytesReply()
  198. if err != nil {
  199. return 0, err
  200. }
  201. return util.ParseInt(b, 10, 64)
  202. }
  203. func (r *Reader) ReadUint() (uint64, error) {
  204. b, err := r.readTmpBytesReply()
  205. if err != nil {
  206. return 0, err
  207. }
  208. return util.ParseUint(b, 10, 64)
  209. }
  210. func (r *Reader) ReadFloatReply() (float64, error) {
  211. b, err := r.readTmpBytesReply()
  212. if err != nil {
  213. return 0, err
  214. }
  215. return util.ParseFloat(b, 64)
  216. }
  217. func (r *Reader) readTmpBytesReply() ([]byte, error) {
  218. line, err := r.ReadLine()
  219. if err != nil {
  220. return nil, err
  221. }
  222. switch line[0] {
  223. case ErrorReply:
  224. return nil, ParseErrorReply(line)
  225. case StringReply:
  226. return r._readTmpBytesReply(line)
  227. case StatusReply:
  228. return line[1:], nil
  229. default:
  230. return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
  231. }
  232. }
  233. func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) {
  234. if isNilReply(line) {
  235. return nil, Nil
  236. }
  237. replyLen, err := util.Atoi(line[1:])
  238. if err != nil {
  239. return nil, err
  240. }
  241. buf := r.buf(replyLen + 2)
  242. _, err = io.ReadFull(r.rd, buf)
  243. if err != nil {
  244. return nil, err
  245. }
  246. return buf[:replyLen], nil
  247. }
  248. func (r *Reader) buf(n int) []byte {
  249. if n <= cap(r._buf) {
  250. return r._buf[:n]
  251. }
  252. d := n - cap(r._buf)
  253. r._buf = append(r._buf, make([]byte, d)...)
  254. return r._buf
  255. }
  256. func isNilReply(b []byte) bool {
  257. return len(b) == 3 &&
  258. (b[0] == StringReply || b[0] == ArrayReply) &&
  259. b[1] == '-' && b[2] == '1'
  260. }
  261. func ParseErrorReply(line []byte) error {
  262. return RedisError(string(line[1:]))
  263. }
  264. func parseArrayLen(line []byte) (int64, error) {
  265. if isNilReply(line) {
  266. return 0, Nil
  267. }
  268. return util.ParseInt(line[1:], 10, 64)
  269. }