frame.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "fmt"
  7. "net"
  8. )
  9. const (
  10. protoDirectionMask = 0x80
  11. protoVersionMask = 0x7F
  12. protoVersion1 = 0x01
  13. protoVersion2 = 0x02
  14. protoVersion3 = 0x03
  15. opError byte = 0x00
  16. opStartup byte = 0x01
  17. opReady byte = 0x02
  18. opAuthenticate byte = 0x03
  19. opOptions byte = 0x05
  20. opSupported byte = 0x06
  21. opQuery byte = 0x07
  22. opResult byte = 0x08
  23. opPrepare byte = 0x09
  24. opExecute byte = 0x0A
  25. opRegister byte = 0x0B
  26. opEvent byte = 0x0C
  27. opBatch byte = 0x0D
  28. opAuthChallenge byte = 0x0E
  29. opAuthResponse byte = 0x0F
  30. opAuthSuccess byte = 0x10
  31. resultKindVoid = 1
  32. resultKindRows = 2
  33. resultKindKeyspace = 3
  34. resultKindPrepared = 4
  35. resultKindSchemaChanged = 5
  36. flagQueryValues uint8 = 1
  37. flagCompress uint8 = 1
  38. flagTrace uint8 = 2
  39. flagPageSize uint8 = 4
  40. flagPageState uint8 = 8
  41. flagHasMore uint8 = 2
  42. apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
  43. )
  44. var headerProtoSize = [...]int{
  45. protoVersion1: 8,
  46. protoVersion2: 8,
  47. protoVersion3: 9,
  48. }
  49. // TODO: replace with a struct which has a header and a body buffer,
  50. // header just has methods like, set/get the options in its backing array
  51. // then in a writeTo we write the header then the body.
  52. type frame []byte
  53. func newFrame(version uint8) frame {
  54. // TODO: pool these at the session level incase anyone is using different
  55. // clusters with different versions in the same application.
  56. return make(frame, headerProtoSize[version], defaultFrameSize)
  57. }
  58. func (f *frame) writeInt(v int32) {
  59. p := f.grow(4)
  60. (*f)[p] = byte(v >> 24)
  61. (*f)[p+1] = byte(v >> 16)
  62. (*f)[p+2] = byte(v >> 8)
  63. (*f)[p+3] = byte(v)
  64. }
  65. func (f *frame) writeShort(v uint16) {
  66. p := f.grow(2)
  67. (*f)[p] = byte(v >> 8)
  68. (*f)[p+1] = byte(v)
  69. }
  70. func (f *frame) writeString(v string) {
  71. f.writeShort(uint16(len(v)))
  72. p := f.grow(len(v))
  73. copy((*f)[p:], v)
  74. }
  75. func (f *frame) writeLongString(v string) {
  76. f.writeInt(int32(len(v)))
  77. p := f.grow(len(v))
  78. copy((*f)[p:], v)
  79. }
  80. func (f *frame) writeUUID() {
  81. }
  82. func (f *frame) writeStringList(v []string) {
  83. f.writeShort(uint16(len(v)))
  84. for i := range v {
  85. f.writeString(v[i])
  86. }
  87. }
  88. func (f *frame) writeByte(v byte) {
  89. p := f.grow(1)
  90. (*f)[p] = v
  91. }
  92. func (f *frame) writeBytes(v []byte) {
  93. if v == nil {
  94. f.writeInt(-1)
  95. return
  96. }
  97. f.writeInt(int32(len(v)))
  98. p := f.grow(len(v))
  99. copy((*f)[p:], v)
  100. }
  101. func (f *frame) writeShortBytes(v []byte) {
  102. f.writeShort(uint16(len(v)))
  103. p := f.grow(len(v))
  104. copy((*f)[p:], v)
  105. }
  106. func (f *frame) writeInet(ip net.IP, port int) {
  107. p := f.grow(1 + len(ip))
  108. (*f)[p] = byte(len(ip))
  109. copy((*f)[p+1:], ip)
  110. f.writeInt(int32(port))
  111. }
  112. func (f *frame) writeStringMap(v map[string]string) {
  113. f.writeShort(uint16(len(v)))
  114. for key, value := range v {
  115. f.writeString(key)
  116. f.writeString(value)
  117. }
  118. }
  119. func (f *frame) writeStringMultimap(v map[string][]string) {
  120. f.writeShort(uint16(len(v)))
  121. for key, values := range v {
  122. f.writeString(key)
  123. f.writeStringList(values)
  124. }
  125. }
  126. func (f *frame) setHeader(version, flags uint8, stream int, opcode uint8) {
  127. (*f)[0] = version
  128. (*f)[1] = flags
  129. p := 2
  130. if version&maskVersion > protoVersion2 {
  131. (*f)[2] = byte(stream >> 8)
  132. (*f)[3] = byte(stream)
  133. p += 2
  134. } else {
  135. (*f)[2] = byte(stream & 0xFF)
  136. p++
  137. }
  138. (*f)[p] = opcode
  139. }
  140. func (f *frame) setStream(stream int, version uint8) {
  141. if version > protoVersion2 {
  142. (*f)[2] = byte(stream >> 8)
  143. (*f)[3] = byte(stream)
  144. } else {
  145. (*f)[2] = byte(stream)
  146. }
  147. }
  148. func (f *frame) Stream(version uint8) (n int) {
  149. if version > protoVersion2 {
  150. n = int((*f)[2])<<8 | int((*f)[3])
  151. } else {
  152. n = int((*f)[2])
  153. }
  154. return
  155. }
  156. func (f *frame) setLength(length int, version uint8) {
  157. p := 4
  158. if version > protoVersion2 {
  159. p = 5
  160. }
  161. (*f)[p] = byte(length >> 24)
  162. (*f)[p+1] = byte(length >> 16)
  163. (*f)[p+2] = byte(length >> 8)
  164. (*f)[p+3] = byte(length)
  165. }
  166. func (f *frame) Op(version uint8) byte {
  167. if version > protoVersion2 {
  168. return (*f)[4]
  169. } else {
  170. return (*f)[3]
  171. }
  172. }
  173. func (f *frame) Length(version uint8) int {
  174. p := 4
  175. if version > protoVersion2 {
  176. p = 5
  177. }
  178. return int((*f)[p])<<24 | int((*f)[p+1])<<16 | int((*f)[p+2])<<8 | int((*f)[p+3])
  179. }
  180. func (f *frame) grow(n int) int {
  181. if len(*f)+n >= cap(*f) {
  182. buf := make(frame, len(*f), len(*f)*2+n)
  183. copy(buf, *f)
  184. *f = buf
  185. }
  186. p := len(*f)
  187. *f = (*f)[:p+n]
  188. return p
  189. }
  190. func (f *frame) skipHeader(version uint8) {
  191. *f = (*f)[headerProtoSize[version]:]
  192. }
  193. func (f *frame) readInt() int {
  194. if len(*f) < 4 {
  195. panic(NewErrProtocol("Trying to read an int while <4 bytes in the buffer"))
  196. }
  197. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  198. *f = (*f)[4:]
  199. return int(int32(v))
  200. }
  201. func (f *frame) readShort() uint16 {
  202. if len(*f) < 2 {
  203. panic(NewErrProtocol("Trying to read a short while <2 bytes in the buffer"))
  204. }
  205. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  206. *f = (*f)[2:]
  207. return v
  208. }
  209. func (f *frame) readString() string {
  210. n := int(f.readShort())
  211. if len(*f) < n {
  212. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  213. }
  214. v := string((*f)[:n])
  215. *f = (*f)[n:]
  216. return v
  217. }
  218. func (f *frame) readLongString() string {
  219. n := f.readInt()
  220. if len(*f) < n {
  221. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  222. }
  223. v := string((*f)[:n])
  224. *f = (*f)[n:]
  225. return v
  226. }
  227. func (f *frame) readBytes() []byte {
  228. n := f.readInt()
  229. if n < 0 {
  230. return nil
  231. }
  232. if len(*f) < n {
  233. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  234. }
  235. v := (*f)[:n]
  236. *f = (*f)[n:]
  237. return v
  238. }
  239. func (f *frame) readShortBytes() []byte {
  240. n := int(f.readShort())
  241. if len(*f) < n {
  242. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  243. }
  244. v := (*f)[:n]
  245. *f = (*f)[n:]
  246. return v
  247. }
  248. func (f *frame) readTypeInfo() *TypeInfo {
  249. x := f.readShort()
  250. typ := &TypeInfo{Type: Type(x)}
  251. switch typ.Type {
  252. case TypeCustom:
  253. typ.Custom = f.readString()
  254. if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
  255. typ = &TypeInfo{Type: cassType}
  256. switch typ.Type {
  257. case TypeMap:
  258. typ.Key = f.readTypeInfo()
  259. fallthrough
  260. case TypeList, TypeSet:
  261. typ.Elem = f.readTypeInfo()
  262. }
  263. }
  264. case TypeMap:
  265. typ.Key = f.readTypeInfo()
  266. fallthrough
  267. case TypeList, TypeSet:
  268. typ.Elem = f.readTypeInfo()
  269. }
  270. return typ
  271. }
  272. func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
  273. flags := f.readInt()
  274. numColumns := f.readInt()
  275. var pageState []byte
  276. if flags&2 != 0 {
  277. pageState = f.readBytes()
  278. }
  279. globalKeyspace := ""
  280. globalTable := ""
  281. if flags&1 != 0 {
  282. globalKeyspace = f.readString()
  283. globalTable = f.readString()
  284. }
  285. columns := make([]ColumnInfo, numColumns)
  286. for i := 0; i < numColumns; i++ {
  287. columns[i].Keyspace = globalKeyspace
  288. columns[i].Table = globalTable
  289. if flags&1 == 0 {
  290. columns[i].Keyspace = f.readString()
  291. columns[i].Table = f.readString()
  292. }
  293. columns[i].Name = f.readString()
  294. columns[i].TypeInfo = f.readTypeInfo()
  295. }
  296. return columns, pageState
  297. }
  298. func (f *frame) readError() RequestError {
  299. code := f.readInt()
  300. msg := f.readString()
  301. errD := errorFrame{code, msg}
  302. switch code {
  303. case errUnavailable:
  304. cl := Consistency(f.readShort())
  305. required := f.readInt()
  306. alive := f.readInt()
  307. return RequestErrUnavailable{errorFrame: errD,
  308. Consistency: cl,
  309. Required: required,
  310. Alive: alive}
  311. case errWriteTimeout:
  312. cl := Consistency(f.readShort())
  313. received := f.readInt()
  314. blockfor := f.readInt()
  315. writeType := f.readString()
  316. return RequestErrWriteTimeout{errorFrame: errD,
  317. Consistency: cl,
  318. Received: received,
  319. BlockFor: blockfor,
  320. WriteType: writeType,
  321. }
  322. case errReadTimeout:
  323. cl := Consistency(f.readShort())
  324. received := f.readInt()
  325. blockfor := f.readInt()
  326. dataPresent := (*f)[0]
  327. *f = (*f)[1:]
  328. return RequestErrReadTimeout{errorFrame: errD,
  329. Consistency: cl,
  330. Received: received,
  331. BlockFor: blockfor,
  332. DataPresent: dataPresent,
  333. }
  334. case errAlreadyExists:
  335. ks := f.readString()
  336. table := f.readString()
  337. return RequestErrAlreadyExists{errorFrame: errD,
  338. Keyspace: ks,
  339. Table: table,
  340. }
  341. case errUnprepared:
  342. stmtId := f.readShortBytes()
  343. return RequestErrUnprepared{errorFrame: errD,
  344. StatementId: stmtId,
  345. }
  346. default:
  347. return errD
  348. }
  349. }
  350. func (f *frame) writeConsistency(c Consistency) {
  351. f.writeShort(consistencyCodes[c])
  352. }
  353. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  354. return f, nil
  355. }
  356. var consistencyCodes = []uint16{
  357. Any: 0x0000,
  358. One: 0x0001,
  359. Two: 0x0002,
  360. Three: 0x0003,
  361. Quorum: 0x0004,
  362. All: 0x0005,
  363. LocalQuorum: 0x0006,
  364. EachQuorum: 0x0007,
  365. Serial: 0x0008,
  366. LocalSerial: 0x0009,
  367. LocalOne: 0x000A,
  368. }
  369. type readyFrame struct{}
  370. type supportedFrame struct{}
  371. type resultVoidFrame struct{}
  372. type resultRowsFrame struct {
  373. Columns []ColumnInfo
  374. Rows [][][]byte
  375. PagingState []byte
  376. }
  377. type resultKeyspaceFrame struct {
  378. Keyspace string
  379. }
  380. type resultPreparedFrame struct {
  381. PreparedId []byte
  382. Arguments []ColumnInfo
  383. ReturnValues []ColumnInfo
  384. }
  385. type operation interface {
  386. encodeFrame(version uint8, dst frame) (frame, error)
  387. }
  388. type startupFrame struct {
  389. CQLVersion string
  390. Compression string
  391. }
  392. func (op *startupFrame) String() string {
  393. return fmt.Sprintf("[startup cqlversion=%q compression=%q]", op.CQLVersion, op.Compression)
  394. }
  395. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  396. if f == nil {
  397. f = newFrame(version)
  398. }
  399. f.setHeader(version, 0, 0, opStartup)
  400. // TODO: fix this, this is actually a StringMap
  401. var size uint16 = 1
  402. if op.Compression != "" {
  403. size++
  404. }
  405. f.writeShort(size)
  406. f.writeString("CQL_VERSION")
  407. f.writeString(op.CQLVersion)
  408. if op.Compression != "" {
  409. f.writeString("COMPRESSION")
  410. f.writeString(op.Compression)
  411. }
  412. return f, nil
  413. }
  414. type queryFrame struct {
  415. Stmt string
  416. Prepared []byte
  417. Cons Consistency
  418. Values [][]byte
  419. PageSize int
  420. PageState []byte
  421. }
  422. func (op *queryFrame) String() string {
  423. return fmt.Sprintf("[query statement=%q prepared=%x cons=%v ...]", op.Stmt, op.Prepared, op.Cons)
  424. }
  425. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  426. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  427. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  428. return nil, ErrUnsupported
  429. }
  430. if f == nil {
  431. f = newFrame(version)
  432. }
  433. if len(op.Prepared) > 0 {
  434. f.setHeader(version, 0, 0, opExecute)
  435. f.writeShortBytes(op.Prepared)
  436. } else {
  437. f.setHeader(version, 0, 0, opQuery)
  438. f.writeLongString(op.Stmt)
  439. }
  440. if version >= 2 {
  441. f.writeConsistency(op.Cons)
  442. flagPos := len(f)
  443. f.writeByte(0)
  444. if len(op.Values) > 0 {
  445. f[flagPos] |= flagQueryValues
  446. f.writeShort(uint16(len(op.Values)))
  447. for _, value := range op.Values {
  448. f.writeBytes(value)
  449. }
  450. }
  451. if op.PageSize > 0 {
  452. f[flagPos] |= flagPageSize
  453. f.writeInt(int32(op.PageSize))
  454. }
  455. if len(op.PageState) > 0 {
  456. f[flagPos] |= flagPageState
  457. f.writeBytes(op.PageState)
  458. }
  459. } else if version == 1 {
  460. if len(op.Prepared) > 0 {
  461. f.writeShort(uint16(len(op.Values)))
  462. for _, value := range op.Values {
  463. f.writeBytes(value)
  464. }
  465. }
  466. f.writeConsistency(op.Cons)
  467. }
  468. return f, nil
  469. }
  470. type prepareFrame struct {
  471. Stmt string
  472. }
  473. func (op *prepareFrame) String() string {
  474. return fmt.Sprintf("[prepare statement=%q]", op.Stmt)
  475. }
  476. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  477. if f == nil {
  478. f = newFrame(version)
  479. }
  480. f.setHeader(version, 0, 0, opPrepare)
  481. f.writeLongString(op.Stmt)
  482. return f, nil
  483. }
  484. type optionsFrame struct{}
  485. func (op *optionsFrame) String() string {
  486. return "[options]"
  487. }
  488. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  489. if f == nil {
  490. f = newFrame(version)
  491. }
  492. f.setHeader(version, 0, 0, opOptions)
  493. return f, nil
  494. }
  495. type authenticateFrame struct {
  496. Authenticator string
  497. }
  498. func (op *authenticateFrame) String() string {
  499. return fmt.Sprintf("[authenticate authenticator=%q]", op.Authenticator)
  500. }
  501. type authResponseFrame struct {
  502. Data []byte
  503. }
  504. func (op *authResponseFrame) String() string {
  505. return fmt.Sprintf("[auth_response data=%q]", op.Data)
  506. }
  507. func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
  508. if f == nil {
  509. f = newFrame(version)
  510. }
  511. f.setHeader(version, 0, 0, opAuthResponse)
  512. f.writeBytes(op.Data)
  513. return f, nil
  514. }
  515. type authSuccessFrame struct {
  516. Data []byte
  517. }
  518. func (op *authSuccessFrame) String() string {
  519. return fmt.Sprintf("[auth_success data=%q]", op.Data)
  520. }
  521. type authChallengeFrame struct {
  522. Data []byte
  523. }
  524. func (op *authChallengeFrame) String() string {
  525. return fmt.Sprintf("[auth_challenge data=%q]", op.Data)
  526. }