frame.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. )
  8. const (
  9. protoRequest byte = 0x02
  10. protoResponse byte = 0x82
  11. opError byte = 0x00
  12. opStartup byte = 0x01
  13. opReady byte = 0x02
  14. opAuthenticate byte = 0x03
  15. opOptions byte = 0x05
  16. opSupported byte = 0x06
  17. opQuery byte = 0x07
  18. opResult byte = 0x08
  19. opPrepare byte = 0x09
  20. opExecute byte = 0x0A
  21. opRegister byte = 0x0B
  22. opEvent byte = 0x0C
  23. opBatch byte = 0x0D
  24. opAuthChallenge byte = 0x0E
  25. opAuthResponse byte = 0x0F
  26. opAuthSuccess byte = 0x10
  27. resultKindVoid = 1
  28. resultKindRows = 2
  29. resultKindKeyspace = 3
  30. resultKindPrepared = 4
  31. resultKindSchemaChanged = 5
  32. flagQueryValues uint8 = 1
  33. flagCompress uint8 = 1
  34. flagTrace uint8 = 2
  35. flagPageSize uint8 = 4
  36. flagPageState uint8 = 8
  37. flagHasMore uint8 = 2
  38. headerSize = 8
  39. )
  40. type frame []byte
  41. func (f *frame) writeInt(v int32) {
  42. p := f.grow(4)
  43. (*f)[p] = byte(v >> 24)
  44. (*f)[p+1] = byte(v >> 16)
  45. (*f)[p+2] = byte(v >> 8)
  46. (*f)[p+3] = byte(v)
  47. }
  48. func (f *frame) writeShort(v uint16) {
  49. p := f.grow(2)
  50. (*f)[p] = byte(v >> 8)
  51. (*f)[p+1] = byte(v)
  52. }
  53. func (f *frame) writeString(v string) {
  54. f.writeShort(uint16(len(v)))
  55. p := f.grow(len(v))
  56. copy((*f)[p:], v)
  57. }
  58. func (f *frame) writeLongString(v string) {
  59. f.writeInt(int32(len(v)))
  60. p := f.grow(len(v))
  61. copy((*f)[p:], v)
  62. }
  63. func (f *frame) writeUUID() {
  64. }
  65. func (f *frame) writeStringList(v []string) {
  66. f.writeShort(uint16(len(v)))
  67. for i := range v {
  68. f.writeString(v[i])
  69. }
  70. }
  71. func (f *frame) writeByte(v byte) {
  72. p := f.grow(1)
  73. (*f)[p] = v
  74. }
  75. func (f *frame) writeBytes(v []byte) {
  76. if v == nil {
  77. f.writeInt(-1)
  78. return
  79. }
  80. f.writeInt(int32(len(v)))
  81. p := f.grow(len(v))
  82. copy((*f)[p:], v)
  83. }
  84. func (f *frame) writeShortBytes(v []byte) {
  85. f.writeShort(uint16(len(v)))
  86. p := f.grow(len(v))
  87. copy((*f)[p:], v)
  88. }
  89. func (f *frame) writeInet(ip net.IP, port int) {
  90. p := f.grow(1 + len(ip))
  91. (*f)[p] = byte(len(ip))
  92. copy((*f)[p+1:], ip)
  93. f.writeInt(int32(port))
  94. }
  95. func (f *frame) writeStringMap(v map[string]string) {
  96. f.writeShort(uint16(len(v)))
  97. for key, value := range v {
  98. f.writeString(key)
  99. f.writeString(value)
  100. }
  101. }
  102. func (f *frame) writeStringMultimap(v map[string][]string) {
  103. f.writeShort(uint16(len(v)))
  104. for key, values := range v {
  105. f.writeString(key)
  106. f.writeStringList(values)
  107. }
  108. }
  109. func (f *frame) setHeader(version, flags, stream, opcode uint8) {
  110. (*f)[0] = version
  111. (*f)[1] = flags
  112. (*f)[2] = stream
  113. (*f)[3] = opcode
  114. }
  115. func (f *frame) setLength(length int) {
  116. (*f)[4] = byte(length >> 24)
  117. (*f)[5] = byte(length >> 16)
  118. (*f)[6] = byte(length >> 8)
  119. (*f)[7] = byte(length)
  120. }
  121. func (f *frame) Length() int {
  122. return int((*f)[4])<<24 | int((*f)[5])<<16 | int((*f)[6])<<8 | int((*f)[7])
  123. }
  124. func (f *frame) grow(n int) int {
  125. if len(*f)+n >= cap(*f) {
  126. buf := make(frame, len(*f), len(*f)*2+n)
  127. copy(buf, *f)
  128. *f = buf
  129. }
  130. p := len(*f)
  131. *f = (*f)[:p+n]
  132. return p
  133. }
  134. func (f *frame) skipHeader() {
  135. *f = (*f)[headerSize:]
  136. }
  137. func (f *frame) readInt() int {
  138. if len(*f) < 4 {
  139. panic(NewErrProtocol("Trying to read an int while >4 bytes in the buffer"))
  140. }
  141. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  142. *f = (*f)[4:]
  143. return int(int32(v))
  144. }
  145. func (f *frame) readShort() uint16 {
  146. if len(*f) < 2 {
  147. panic(NewErrProtocol("Trying to read a short while >2 bytes in the buffer"))
  148. }
  149. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  150. *f = (*f)[2:]
  151. return v
  152. }
  153. func (f *frame) readString() string {
  154. n := int(f.readShort())
  155. if len(*f) < n {
  156. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  157. }
  158. v := string((*f)[:n])
  159. *f = (*f)[n:]
  160. return v
  161. }
  162. func (f *frame) readLongString() string {
  163. n := f.readInt()
  164. if len(*f) < n {
  165. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  166. }
  167. v := string((*f)[:n])
  168. *f = (*f)[n:]
  169. return v
  170. }
  171. func (f *frame) readBytes() []byte {
  172. n := f.readInt()
  173. if n < 0 {
  174. return nil
  175. }
  176. if len(*f) < n {
  177. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  178. }
  179. v := (*f)[:n]
  180. *f = (*f)[n:]
  181. return v
  182. }
  183. func (f *frame) readShortBytes() []byte {
  184. n := int(f.readShort())
  185. if len(*f) < n {
  186. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  187. }
  188. v := (*f)[:n]
  189. *f = (*f)[n:]
  190. return v
  191. }
  192. func (f *frame) readTypeInfo() *TypeInfo {
  193. x := f.readShort()
  194. typ := &TypeInfo{Type: Type(x)}
  195. switch typ.Type {
  196. case TypeCustom:
  197. typ.Custom = f.readString()
  198. case TypeMap:
  199. typ.Key = f.readTypeInfo()
  200. fallthrough
  201. case TypeList, TypeSet:
  202. typ.Elem = f.readTypeInfo()
  203. }
  204. return typ
  205. }
  206. func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
  207. flags := f.readInt()
  208. numColumns := f.readInt()
  209. var pageState []byte
  210. if flags&2 != 0 {
  211. pageState = f.readBytes()
  212. }
  213. globalKeyspace := ""
  214. globalTable := ""
  215. if flags&1 != 0 {
  216. globalKeyspace = f.readString()
  217. globalTable = f.readString()
  218. }
  219. columns := make([]ColumnInfo, numColumns)
  220. for i := 0; i < numColumns; i++ {
  221. columns[i].Keyspace = globalKeyspace
  222. columns[i].Table = globalTable
  223. if flags&1 == 0 {
  224. columns[i].Keyspace = f.readString()
  225. columns[i].Table = f.readString()
  226. }
  227. columns[i].Name = f.readString()
  228. columns[i].TypeInfo = f.readTypeInfo()
  229. }
  230. return columns, pageState
  231. }
  232. func (f *frame) readError() errorFrame {
  233. code := f.readInt()
  234. msg := f.readString()
  235. errD := errorResponse{code, msg}
  236. switch code {
  237. case errUnavailable:
  238. cl := Consistency(f.readShort())
  239. required := f.readInt()
  240. alive := f.readInt()
  241. return errRespUnavailable{errorResponse: errD,
  242. Consistency: cl,
  243. Required: required,
  244. Alive: alive}
  245. case errWriteTimeout:
  246. cl := Consistency(f.readShort())
  247. received := f.readInt()
  248. blockfor := f.readInt()
  249. writeType := f.readString()
  250. return errRespWriteTimeout{errorResponse: errD,
  251. Consistency: cl,
  252. Received: received,
  253. BlockFor: blockfor,
  254. WriteType: writeType,
  255. }
  256. case errReadTimeout:
  257. cl := Consistency(f.readShort())
  258. received := f.readInt()
  259. blockfor := f.readInt()
  260. dataPresent := (*f)[0]
  261. *f = (*f)[1:]
  262. return errRespReadTimeout{errorResponse: errD,
  263. Consistency: cl,
  264. Received: received,
  265. BlockFor: blockfor,
  266. DataPresent: dataPresent,
  267. }
  268. case errAlreadyExists:
  269. ks := f.readString()
  270. table := f.readString()
  271. return errRespAlreadyExists{errorResponse: errD,
  272. Keyspace: ks,
  273. Table: table,
  274. }
  275. case errUnprepared:
  276. stmtId := f.readShortBytes()
  277. return errRespUnprepared{errorResponse: errD,
  278. StatementId: stmtId,
  279. }
  280. default:
  281. return errD
  282. }
  283. }
  284. func (f *frame) writeConsistency(c Consistency) {
  285. f.writeShort(consistencyCodes[c])
  286. }
  287. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  288. return f, nil
  289. }
  290. var consistencyCodes = []uint16{
  291. Any: 0x0000,
  292. One: 0x0001,
  293. Two: 0x0002,
  294. Three: 0x0003,
  295. Quorum: 0x0004,
  296. All: 0x0005,
  297. LocalQuorum: 0x0006,
  298. EachQuorum: 0x0007,
  299. Serial: 0x0008,
  300. LocalSerial: 0x0009,
  301. }
  302. type readyFrame struct{}
  303. type supportedFrame struct{}
  304. type resultVoidFrame struct{}
  305. type resultRowsFrame struct {
  306. Columns []ColumnInfo
  307. Rows [][][]byte
  308. PagingState []byte
  309. }
  310. type resultKeyspaceFrame struct {
  311. Keyspace string
  312. }
  313. type resultPreparedFrame struct {
  314. PreparedId []byte
  315. Values []ColumnInfo
  316. }
  317. type operation interface {
  318. encodeFrame(version uint8, dst frame) (frame, error)
  319. }
  320. type startupFrame struct {
  321. CQLVersion string
  322. Compression string
  323. }
  324. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  325. if f == nil {
  326. f = make(frame, headerSize, defaultFrameSize)
  327. }
  328. f.setHeader(version, 0, 0, opStartup)
  329. f.writeShort(1)
  330. f.writeString("CQL_VERSION")
  331. f.writeString(op.CQLVersion)
  332. if op.Compression != "" {
  333. f[headerSize+1] += 1
  334. f.writeString("COMPRESSION")
  335. f.writeString(op.Compression)
  336. }
  337. return f, nil
  338. }
  339. type queryFrame struct {
  340. Stmt string
  341. Prepared []byte
  342. Cons Consistency
  343. Values [][]byte
  344. PageSize int
  345. PageState []byte
  346. }
  347. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  348. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  349. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  350. return nil, ErrUnsupported
  351. }
  352. if f == nil {
  353. f = make(frame, headerSize, defaultFrameSize)
  354. }
  355. if len(op.Prepared) > 0 {
  356. f.setHeader(version, 0, 0, opExecute)
  357. f.writeShortBytes(op.Prepared)
  358. } else {
  359. f.setHeader(version, 0, 0, opQuery)
  360. f.writeLongString(op.Stmt)
  361. }
  362. if version >= 2 {
  363. f.writeConsistency(op.Cons)
  364. flagPos := len(f)
  365. f.writeByte(0)
  366. if len(op.Values) > 0 {
  367. f[flagPos] |= flagQueryValues
  368. f.writeShort(uint16(len(op.Values)))
  369. for _, value := range op.Values {
  370. f.writeBytes(value)
  371. }
  372. }
  373. if op.PageSize > 0 {
  374. f[flagPos] |= flagPageSize
  375. f.writeInt(int32(op.PageSize))
  376. }
  377. if len(op.PageState) > 0 {
  378. f[flagPos] |= flagPageState
  379. f.writeBytes(op.PageState)
  380. }
  381. } else if version == 1 {
  382. if len(op.Prepared) > 0 {
  383. f.writeShort(uint16(len(op.Values)))
  384. for _, value := range op.Values {
  385. f.writeBytes(value)
  386. }
  387. }
  388. f.writeConsistency(op.Cons)
  389. }
  390. return f, nil
  391. }
  392. type prepareFrame struct {
  393. Stmt string
  394. }
  395. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  396. if f == nil {
  397. f = make(frame, headerSize, defaultFrameSize)
  398. }
  399. f.setHeader(version, 0, 0, opPrepare)
  400. f.writeLongString(op.Stmt)
  401. return f, nil
  402. }
  403. type optionsFrame struct{}
  404. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  405. if f == nil {
  406. f = make(frame, headerSize, defaultFrameSize)
  407. }
  408. f.setHeader(version, 0, 0, opOptions)
  409. return f, nil
  410. }
  411. type authenticateFrame struct {
  412. Authenticator string
  413. }
  414. type authResponseFrame struct {
  415. Data []byte
  416. }
  417. func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
  418. if f == nil {
  419. f = make(frame, headerSize, defaultFrameSize)
  420. }
  421. f.setHeader(version, 0, 0, opAuthResponse)
  422. f.writeBytes(op.Data)
  423. return f, nil
  424. }
  425. type authSuccessFrame struct {
  426. Data []byte
  427. }
  428. type authChallengeFrame struct {
  429. Data []byte
  430. }