frame.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. )
  8. const (
  9. protoRequest byte = 0x02
  10. protoResponse byte = 0x82
  11. opError byte = 0x00
  12. opStartup byte = 0x01
  13. opReady byte = 0x02
  14. opAuthenticate byte = 0x03
  15. opOptions byte = 0x05
  16. opSupported byte = 0x06
  17. opQuery byte = 0x07
  18. opResult byte = 0x08
  19. opPrepare byte = 0x09
  20. opExecute byte = 0x0A
  21. opRegister byte = 0x0B
  22. opEvent byte = 0x0C
  23. opBatch byte = 0x0D
  24. opAuthChallenge byte = 0x0E
  25. opAuthResponse byte = 0x0F
  26. opAuthSuccess byte = 0x10
  27. resultKindVoid = 1
  28. resultKindRows = 2
  29. resultKindKeyspace = 3
  30. resultKindPrepared = 4
  31. resultKindSchemaChanged = 5
  32. flagQueryValues uint8 = 1
  33. flagCompress uint8 = 1
  34. flagTrace uint8 = 2
  35. flagPageSize uint8 = 4
  36. flagPageState uint8 = 8
  37. flagHasMore uint8 = 2
  38. headerSize = 8
  39. apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
  40. )
  41. type frame []byte
  42. func (f *frame) writeInt(v int32) {
  43. p := f.grow(4)
  44. (*f)[p] = byte(v >> 24)
  45. (*f)[p+1] = byte(v >> 16)
  46. (*f)[p+2] = byte(v >> 8)
  47. (*f)[p+3] = byte(v)
  48. }
  49. func (f *frame) writeShort(v uint16) {
  50. p := f.grow(2)
  51. (*f)[p] = byte(v >> 8)
  52. (*f)[p+1] = byte(v)
  53. }
  54. func (f *frame) writeString(v string) {
  55. f.writeShort(uint16(len(v)))
  56. p := f.grow(len(v))
  57. copy((*f)[p:], v)
  58. }
  59. func (f *frame) writeLongString(v string) {
  60. f.writeInt(int32(len(v)))
  61. p := f.grow(len(v))
  62. copy((*f)[p:], v)
  63. }
  64. func (f *frame) writeUUID() {
  65. }
  66. func (f *frame) writeStringList(v []string) {
  67. f.writeShort(uint16(len(v)))
  68. for i := range v {
  69. f.writeString(v[i])
  70. }
  71. }
  72. func (f *frame) writeByte(v byte) {
  73. p := f.grow(1)
  74. (*f)[p] = v
  75. }
  76. func (f *frame) writeBytes(v []byte) {
  77. if v == nil {
  78. f.writeInt(-1)
  79. return
  80. }
  81. f.writeInt(int32(len(v)))
  82. p := f.grow(len(v))
  83. copy((*f)[p:], v)
  84. }
  85. func (f *frame) writeShortBytes(v []byte) {
  86. f.writeShort(uint16(len(v)))
  87. p := f.grow(len(v))
  88. copy((*f)[p:], v)
  89. }
  90. func (f *frame) writeInet(ip net.IP, port int) {
  91. p := f.grow(1 + len(ip))
  92. (*f)[p] = byte(len(ip))
  93. copy((*f)[p+1:], ip)
  94. f.writeInt(int32(port))
  95. }
  96. func (f *frame) writeStringMap(v map[string]string) {
  97. f.writeShort(uint16(len(v)))
  98. for key, value := range v {
  99. f.writeString(key)
  100. f.writeString(value)
  101. }
  102. }
  103. func (f *frame) writeStringMultimap(v map[string][]string) {
  104. f.writeShort(uint16(len(v)))
  105. for key, values := range v {
  106. f.writeString(key)
  107. f.writeStringList(values)
  108. }
  109. }
  110. func (f *frame) setHeader(version, flags, stream, opcode uint8) {
  111. (*f)[0] = version
  112. (*f)[1] = flags
  113. (*f)[2] = stream
  114. (*f)[3] = opcode
  115. }
  116. func (f *frame) setLength(length int) {
  117. (*f)[4] = byte(length >> 24)
  118. (*f)[5] = byte(length >> 16)
  119. (*f)[6] = byte(length >> 8)
  120. (*f)[7] = byte(length)
  121. }
  122. func (f *frame) Length() int {
  123. return int((*f)[4])<<24 | int((*f)[5])<<16 | int((*f)[6])<<8 | int((*f)[7])
  124. }
  125. func (f *frame) grow(n int) int {
  126. if len(*f)+n >= cap(*f) {
  127. buf := make(frame, len(*f), len(*f)*2+n)
  128. copy(buf, *f)
  129. *f = buf
  130. }
  131. p := len(*f)
  132. *f = (*f)[:p+n]
  133. return p
  134. }
  135. func (f *frame) skipHeader() {
  136. *f = (*f)[headerSize:]
  137. }
  138. func (f *frame) readInt() int {
  139. if len(*f) < 4 {
  140. panic(NewErrProtocol("Trying to read an int while >4 bytes in the buffer"))
  141. }
  142. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  143. *f = (*f)[4:]
  144. return int(int32(v))
  145. }
  146. func (f *frame) readShort() uint16 {
  147. if len(*f) < 2 {
  148. panic(NewErrProtocol("Trying to read a short while >2 bytes in the buffer"))
  149. }
  150. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  151. *f = (*f)[2:]
  152. return v
  153. }
  154. func (f *frame) readString() string {
  155. n := int(f.readShort())
  156. if len(*f) < n {
  157. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  158. }
  159. v := string((*f)[:n])
  160. *f = (*f)[n:]
  161. return v
  162. }
  163. func (f *frame) readLongString() string {
  164. n := f.readInt()
  165. if len(*f) < n {
  166. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  167. }
  168. v := string((*f)[:n])
  169. *f = (*f)[n:]
  170. return v
  171. }
  172. func (f *frame) readBytes() []byte {
  173. n := f.readInt()
  174. if n < 0 {
  175. return nil
  176. }
  177. if len(*f) < n {
  178. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  179. }
  180. v := (*f)[:n]
  181. *f = (*f)[n:]
  182. return v
  183. }
  184. func (f *frame) readShortBytes() []byte {
  185. n := int(f.readShort())
  186. if len(*f) < n {
  187. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  188. }
  189. v := (*f)[:n]
  190. *f = (*f)[n:]
  191. return v
  192. }
  193. func (f *frame) readTypeInfo() *TypeInfo {
  194. x := f.readShort()
  195. typ := &TypeInfo{Type: Type(x)}
  196. switch typ.Type {
  197. case TypeCustom:
  198. typ.Custom = f.readString()
  199. if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
  200. typ = &TypeInfo{Type: cassType}
  201. switch typ.Type {
  202. case TypeMap:
  203. typ.Key = f.readTypeInfo()
  204. fallthrough
  205. case TypeList, TypeSet:
  206. typ.Elem = f.readTypeInfo()
  207. }
  208. }
  209. case TypeMap:
  210. typ.Key = f.readTypeInfo()
  211. fallthrough
  212. case TypeList, TypeSet:
  213. typ.Elem = f.readTypeInfo()
  214. }
  215. return typ
  216. }
  217. func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
  218. flags := f.readInt()
  219. numColumns := f.readInt()
  220. var pageState []byte
  221. if flags&2 != 0 {
  222. pageState = f.readBytes()
  223. }
  224. globalKeyspace := ""
  225. globalTable := ""
  226. if flags&1 != 0 {
  227. globalKeyspace = f.readString()
  228. globalTable = f.readString()
  229. }
  230. columns := make([]ColumnInfo, numColumns)
  231. for i := 0; i < numColumns; i++ {
  232. columns[i].Keyspace = globalKeyspace
  233. columns[i].Table = globalTable
  234. if flags&1 == 0 {
  235. columns[i].Keyspace = f.readString()
  236. columns[i].Table = f.readString()
  237. }
  238. columns[i].Name = f.readString()
  239. columns[i].TypeInfo = f.readTypeInfo()
  240. }
  241. return columns, pageState
  242. }
  243. func (f *frame) readError() RequestError {
  244. code := f.readInt()
  245. msg := f.readString()
  246. errD := errorFrame{code, msg}
  247. switch code {
  248. case errUnavailable:
  249. cl := Consistency(f.readShort())
  250. required := f.readInt()
  251. alive := f.readInt()
  252. return RequestErrUnavailable{errorFrame: errD,
  253. Consistency: cl,
  254. Required: required,
  255. Alive: alive}
  256. case errWriteTimeout:
  257. cl := Consistency(f.readShort())
  258. received := f.readInt()
  259. blockfor := f.readInt()
  260. writeType := f.readString()
  261. return RequestErrWriteTimeout{errorFrame: errD,
  262. Consistency: cl,
  263. Received: received,
  264. BlockFor: blockfor,
  265. WriteType: writeType,
  266. }
  267. case errReadTimeout:
  268. cl := Consistency(f.readShort())
  269. received := f.readInt()
  270. blockfor := f.readInt()
  271. dataPresent := (*f)[0]
  272. *f = (*f)[1:]
  273. return RequestErrReadTimeout{errorFrame: errD,
  274. Consistency: cl,
  275. Received: received,
  276. BlockFor: blockfor,
  277. DataPresent: dataPresent,
  278. }
  279. case errAlreadyExists:
  280. ks := f.readString()
  281. table := f.readString()
  282. return RequestErrAlreadyExists{errorFrame: errD,
  283. Keyspace: ks,
  284. Table: table,
  285. }
  286. case errUnprepared:
  287. stmtId := f.readShortBytes()
  288. return RequestErrUnprepared{errorFrame: errD,
  289. StatementId: stmtId,
  290. }
  291. default:
  292. return errD
  293. }
  294. }
  295. func (f *frame) writeConsistency(c Consistency) {
  296. f.writeShort(consistencyCodes[c])
  297. }
  298. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  299. return f, nil
  300. }
  301. var consistencyCodes = []uint16{
  302. Any: 0x0000,
  303. One: 0x0001,
  304. Two: 0x0002,
  305. Three: 0x0003,
  306. Quorum: 0x0004,
  307. All: 0x0005,
  308. LocalQuorum: 0x0006,
  309. EachQuorum: 0x0007,
  310. Serial: 0x0008,
  311. LocalSerial: 0x0009,
  312. LocalOne: 0x000A,
  313. }
  314. type readyFrame struct{}
  315. type supportedFrame struct{}
  316. type resultVoidFrame struct{}
  317. type resultRowsFrame struct {
  318. Columns []ColumnInfo
  319. Rows [][][]byte
  320. PagingState []byte
  321. }
  322. type resultKeyspaceFrame struct {
  323. Keyspace string
  324. }
  325. type resultPreparedFrame struct {
  326. PreparedId []byte
  327. Arguments []ColumnInfo
  328. ReturnValues []ColumnInfo
  329. }
  330. type operation interface {
  331. encodeFrame(version uint8, dst frame) (frame, error)
  332. }
  333. type startupFrame struct {
  334. CQLVersion string
  335. Compression string
  336. }
  337. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  338. if f == nil {
  339. f = make(frame, headerSize, defaultFrameSize)
  340. }
  341. f.setHeader(version, 0, 0, opStartup)
  342. f.writeShort(1)
  343. f.writeString("CQL_VERSION")
  344. f.writeString(op.CQLVersion)
  345. if op.Compression != "" {
  346. f[headerSize+1] += 1
  347. f.writeString("COMPRESSION")
  348. f.writeString(op.Compression)
  349. }
  350. return f, nil
  351. }
  352. type queryFrame struct {
  353. Stmt string
  354. Prepared []byte
  355. Cons Consistency
  356. Values [][]byte
  357. PageSize int
  358. PageState []byte
  359. }
  360. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  361. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  362. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  363. return nil, ErrUnsupported
  364. }
  365. if f == nil {
  366. f = make(frame, headerSize, defaultFrameSize)
  367. }
  368. if len(op.Prepared) > 0 {
  369. f.setHeader(version, 0, 0, opExecute)
  370. f.writeShortBytes(op.Prepared)
  371. } else {
  372. f.setHeader(version, 0, 0, opQuery)
  373. f.writeLongString(op.Stmt)
  374. }
  375. if version >= 2 {
  376. f.writeConsistency(op.Cons)
  377. flagPos := len(f)
  378. f.writeByte(0)
  379. if len(op.Values) > 0 {
  380. f[flagPos] |= flagQueryValues
  381. f.writeShort(uint16(len(op.Values)))
  382. for _, value := range op.Values {
  383. f.writeBytes(value)
  384. }
  385. }
  386. if op.PageSize > 0 {
  387. f[flagPos] |= flagPageSize
  388. f.writeInt(int32(op.PageSize))
  389. }
  390. if len(op.PageState) > 0 {
  391. f[flagPos] |= flagPageState
  392. f.writeBytes(op.PageState)
  393. }
  394. } else if version == 1 {
  395. if len(op.Prepared) > 0 {
  396. f.writeShort(uint16(len(op.Values)))
  397. for _, value := range op.Values {
  398. f.writeBytes(value)
  399. }
  400. }
  401. f.writeConsistency(op.Cons)
  402. }
  403. return f, nil
  404. }
  405. type prepareFrame struct {
  406. Stmt string
  407. }
  408. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  409. if f == nil {
  410. f = make(frame, headerSize, defaultFrameSize)
  411. }
  412. f.setHeader(version, 0, 0, opPrepare)
  413. f.writeLongString(op.Stmt)
  414. return f, nil
  415. }
  416. type optionsFrame struct{}
  417. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  418. if f == nil {
  419. f = make(frame, headerSize, defaultFrameSize)
  420. }
  421. f.setHeader(version, 0, 0, opOptions)
  422. return f, nil
  423. }
  424. type authenticateFrame struct {
  425. Authenticator string
  426. }
  427. type authResponseFrame struct {
  428. Data []byte
  429. }
  430. func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
  431. if f == nil {
  432. f = make(frame, headerSize, defaultFrameSize)
  433. }
  434. f.setHeader(version, 0, 0, opAuthResponse)
  435. f.writeBytes(op.Data)
  436. return f, nil
  437. }
  438. type authSuccessFrame struct {
  439. Data []byte
  440. }
  441. type authChallengeFrame struct {
  442. Data []byte
  443. }