frame.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. )
  8. const (
  9. protoRequest byte = 0x02
  10. protoResponse byte = 0x82
  11. opError byte = 0x00
  12. opStartup byte = 0x01
  13. opReady byte = 0x02
  14. opAuthenticate byte = 0x03
  15. opOptions byte = 0x05
  16. opSupported byte = 0x06
  17. opQuery byte = 0x07
  18. opResult byte = 0x08
  19. opPrepare byte = 0x09
  20. opExecute byte = 0x0A
  21. opRegister byte = 0x0B
  22. opEvent byte = 0x0C
  23. opBatch byte = 0x0D
  24. opAuthChallenge byte = 0x0E
  25. opAuthResponse byte = 0x0F
  26. opAuthSuccess byte = 0x10
  27. resultKindVoid = 1
  28. resultKindRows = 2
  29. resultKindKeyspace = 3
  30. resultKindPrepared = 4
  31. resultKindSchemaChanged = 5
  32. flagQueryValues uint8 = 1
  33. flagCompress uint8 = 1
  34. flagTrace uint8 = 2
  35. flagPageSize uint8 = 4
  36. flagPageState uint8 = 8
  37. flagHasMore uint8 = 2
  38. errServer = 0x0000
  39. errProtocol = 0x000A
  40. errCredentials = 0x0100
  41. errUnavailable = 0x1000
  42. errOverloaded = 0x1001
  43. errBootstrapping = 0x1002
  44. errTruncate = 0x1003
  45. errWriteTimeout = 0x1100
  46. errReadTimeout = 0x1200
  47. errSyntax = 0x2000
  48. errUnauthorized = 0x2100
  49. errInvalid = 0x2200
  50. errConfig = 0x2300
  51. errAlreadyExists = 0x2400
  52. errUnprepared = 0x2500
  53. headerSize = 8
  54. )
  55. type frame []byte
  56. func (f *frame) writeInt(v int32) {
  57. p := f.grow(4)
  58. (*f)[p] = byte(v >> 24)
  59. (*f)[p+1] = byte(v >> 16)
  60. (*f)[p+2] = byte(v >> 8)
  61. (*f)[p+3] = byte(v)
  62. }
  63. func (f *frame) writeShort(v uint16) {
  64. p := f.grow(2)
  65. (*f)[p] = byte(v >> 8)
  66. (*f)[p+1] = byte(v)
  67. }
  68. func (f *frame) writeString(v string) {
  69. f.writeShort(uint16(len(v)))
  70. p := f.grow(len(v))
  71. copy((*f)[p:], v)
  72. }
  73. func (f *frame) writeLongString(v string) {
  74. f.writeInt(int32(len(v)))
  75. p := f.grow(len(v))
  76. copy((*f)[p:], v)
  77. }
  78. func (f *frame) writeUUID() {
  79. }
  80. func (f *frame) writeStringList(v []string) {
  81. f.writeShort(uint16(len(v)))
  82. for i := range v {
  83. f.writeString(v[i])
  84. }
  85. }
  86. func (f *frame) writeByte(v byte) {
  87. p := f.grow(1)
  88. (*f)[p] = v
  89. }
  90. func (f *frame) writeBytes(v []byte) {
  91. if v == nil {
  92. f.writeInt(-1)
  93. return
  94. }
  95. f.writeInt(int32(len(v)))
  96. p := f.grow(len(v))
  97. copy((*f)[p:], v)
  98. }
  99. func (f *frame) writeShortBytes(v []byte) {
  100. f.writeShort(uint16(len(v)))
  101. p := f.grow(len(v))
  102. copy((*f)[p:], v)
  103. }
  104. func (f *frame) writeInet(ip net.IP, port int) {
  105. p := f.grow(1 + len(ip))
  106. (*f)[p] = byte(len(ip))
  107. copy((*f)[p+1:], ip)
  108. f.writeInt(int32(port))
  109. }
  110. func (f *frame) writeStringMap(v map[string]string) {
  111. f.writeShort(uint16(len(v)))
  112. for key, value := range v {
  113. f.writeString(key)
  114. f.writeString(value)
  115. }
  116. }
  117. func (f *frame) writeStringMultimap(v map[string][]string) {
  118. f.writeShort(uint16(len(v)))
  119. for key, values := range v {
  120. f.writeString(key)
  121. f.writeStringList(values)
  122. }
  123. }
  124. func (f *frame) setHeader(version, flags, stream, opcode uint8) {
  125. (*f)[0] = version
  126. (*f)[1] = flags
  127. (*f)[2] = stream
  128. (*f)[3] = opcode
  129. }
  130. func (f *frame) setLength(length int) {
  131. (*f)[4] = byte(length >> 24)
  132. (*f)[5] = byte(length >> 16)
  133. (*f)[6] = byte(length >> 8)
  134. (*f)[7] = byte(length)
  135. }
  136. func (f *frame) Length() int {
  137. return int((*f)[4])<<24 | int((*f)[5])<<16 | int((*f)[6])<<8 | int((*f)[7])
  138. }
  139. func (f *frame) grow(n int) int {
  140. if len(*f)+n >= cap(*f) {
  141. buf := make(frame, len(*f), len(*f)*2+n)
  142. copy(buf, *f)
  143. *f = buf
  144. }
  145. p := len(*f)
  146. *f = (*f)[:p+n]
  147. return p
  148. }
  149. func (f *frame) skipHeader() {
  150. *f = (*f)[headerSize:]
  151. }
  152. func (f *frame) readInt() int {
  153. if len(*f) < 4 {
  154. panic(ErrProtocol)
  155. }
  156. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  157. *f = (*f)[4:]
  158. return int(int32(v))
  159. }
  160. func (f *frame) readShort() uint16 {
  161. if len(*f) < 2 {
  162. panic(ErrProtocol)
  163. }
  164. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  165. *f = (*f)[2:]
  166. return v
  167. }
  168. func (f *frame) readString() string {
  169. n := int(f.readShort())
  170. if len(*f) < n {
  171. panic(ErrProtocol)
  172. }
  173. v := string((*f)[:n])
  174. *f = (*f)[n:]
  175. return v
  176. }
  177. func (f *frame) readLongString() string {
  178. n := f.readInt()
  179. if len(*f) < n {
  180. panic(ErrProtocol)
  181. }
  182. v := string((*f)[:n])
  183. *f = (*f)[n:]
  184. return v
  185. }
  186. func (f *frame) readBytes() []byte {
  187. n := f.readInt()
  188. if n < 0 {
  189. return nil
  190. }
  191. if len(*f) < n {
  192. panic(ErrProtocol)
  193. }
  194. v := (*f)[:n]
  195. *f = (*f)[n:]
  196. return v
  197. }
  198. func (f *frame) readShortBytes() []byte {
  199. n := int(f.readShort())
  200. if len(*f) < n {
  201. panic(ErrProtocol)
  202. }
  203. v := (*f)[:n]
  204. *f = (*f)[n:]
  205. return v
  206. }
  207. func (f *frame) readTypeInfo() *TypeInfo {
  208. x := f.readShort()
  209. typ := &TypeInfo{Type: Type(x)}
  210. switch typ.Type {
  211. case TypeCustom:
  212. typ.Custom = f.readString()
  213. case TypeMap:
  214. typ.Key = f.readTypeInfo()
  215. fallthrough
  216. case TypeList, TypeSet:
  217. typ.Elem = f.readTypeInfo()
  218. }
  219. return typ
  220. }
  221. func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
  222. flags := f.readInt()
  223. numColumns := f.readInt()
  224. var pageState []byte
  225. if flags&2 != 0 {
  226. pageState = f.readBytes()
  227. }
  228. globalKeyspace := ""
  229. globalTable := ""
  230. if flags&1 != 0 {
  231. globalKeyspace = f.readString()
  232. globalTable = f.readString()
  233. }
  234. columns := make([]ColumnInfo, numColumns)
  235. for i := 0; i < numColumns; i++ {
  236. columns[i].Keyspace = globalKeyspace
  237. columns[i].Table = globalTable
  238. if flags&1 == 0 {
  239. columns[i].Keyspace = f.readString()
  240. columns[i].Table = f.readString()
  241. }
  242. columns[i].Name = f.readString()
  243. columns[i].TypeInfo = f.readTypeInfo()
  244. }
  245. return columns, pageState
  246. }
  247. func (f *frame) writeConsistency(c Consistency) {
  248. f.writeShort(consistencyCodes[c])
  249. }
  250. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  251. return f, nil
  252. }
  253. var consistencyCodes = []uint16{
  254. Any: 0x0000,
  255. One: 0x0001,
  256. Two: 0x0002,
  257. Three: 0x0003,
  258. Quorum: 0x0004,
  259. All: 0x0005,
  260. LocalQuorum: 0x0006,
  261. EachQuorum: 0x0007,
  262. Serial: 0x0008,
  263. LocalSerial: 0x0009,
  264. }
  265. type readyFrame struct{}
  266. type supportedFrame struct{}
  267. type resultVoidFrame struct{}
  268. type resultRowsFrame struct {
  269. Columns []ColumnInfo
  270. Rows [][][]byte
  271. PagingState []byte
  272. }
  273. type resultKeyspaceFrame struct {
  274. Keyspace string
  275. }
  276. type resultPreparedFrame struct {
  277. PreparedId []byte
  278. Values []ColumnInfo
  279. }
  280. type errorFrame struct {
  281. Code int
  282. Message string
  283. }
  284. func (e errorFrame) Error() string {
  285. return e.Message
  286. }
  287. type operation interface {
  288. encodeFrame(version uint8, dst frame) (frame, error)
  289. }
  290. type startupFrame struct {
  291. CQLVersion string
  292. Compression string
  293. }
  294. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  295. if f == nil {
  296. f = make(frame, headerSize, defaultFrameSize)
  297. }
  298. f.setHeader(version, 0, 0, opStartup)
  299. f.writeShort(1)
  300. f.writeString("CQL_VERSION")
  301. f.writeString(op.CQLVersion)
  302. if op.Compression != "" {
  303. f[headerSize+1] += 1
  304. f.writeString("COMPRESSION")
  305. f.writeString(op.Compression)
  306. }
  307. return f, nil
  308. }
  309. type queryFrame struct {
  310. Stmt string
  311. Prepared []byte
  312. Cons Consistency
  313. Values [][]byte
  314. PageSize int
  315. PageState []byte
  316. }
  317. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  318. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  319. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  320. return nil, ErrUnsupported
  321. }
  322. if f == nil {
  323. f = make(frame, headerSize, defaultFrameSize)
  324. }
  325. if len(op.Prepared) > 0 {
  326. f.setHeader(version, 0, 0, opExecute)
  327. f.writeShortBytes(op.Prepared)
  328. } else {
  329. f.setHeader(version, 0, 0, opQuery)
  330. f.writeLongString(op.Stmt)
  331. }
  332. if version >= 2 {
  333. f.writeConsistency(op.Cons)
  334. flagPos := len(f)
  335. f.writeByte(0)
  336. if len(op.Values) > 0 {
  337. f[flagPos] |= flagQueryValues
  338. f.writeShort(uint16(len(op.Values)))
  339. for _, value := range op.Values {
  340. f.writeBytes(value)
  341. }
  342. }
  343. if op.PageSize > 0 {
  344. f[flagPos] |= flagPageSize
  345. f.writeInt(int32(op.PageSize))
  346. }
  347. if len(op.PageState) > 0 {
  348. f[flagPos] |= flagPageState
  349. f.writeBytes(op.PageState)
  350. }
  351. } else if version == 1 {
  352. if len(op.Prepared) > 0 {
  353. f.writeShort(uint16(len(op.Values)))
  354. for _, value := range op.Values {
  355. f.writeBytes(value)
  356. }
  357. }
  358. f.writeConsistency(op.Cons)
  359. }
  360. return f, nil
  361. }
  362. type prepareFrame struct {
  363. Stmt string
  364. }
  365. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  366. if f == nil {
  367. f = make(frame, headerSize, defaultFrameSize)
  368. }
  369. f.setHeader(version, 0, 0, opPrepare)
  370. f.writeLongString(op.Stmt)
  371. return f, nil
  372. }
  373. type optionsFrame struct{}
  374. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  375. if f == nil {
  376. f = make(frame, headerSize, defaultFrameSize)
  377. }
  378. f.setHeader(version, 0, 0, opOptions)
  379. return f, nil
  380. }