frame.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "fmt"
  7. "net"
  8. )
  9. const (
  10. protoDirectionMask = 0x80
  11. protoVersionMask = 0x7F
  12. protoVersion1 = 0x01
  13. protoVersion2 = 0x02
  14. protoVersion3 = 0x03
  15. opError byte = 0x00
  16. opStartup byte = 0x01
  17. opReady byte = 0x02
  18. opAuthenticate byte = 0x03
  19. opOptions byte = 0x05
  20. opSupported byte = 0x06
  21. opQuery byte = 0x07
  22. opResult byte = 0x08
  23. opPrepare byte = 0x09
  24. opExecute byte = 0x0A
  25. opRegister byte = 0x0B
  26. opEvent byte = 0x0C
  27. opBatch byte = 0x0D
  28. opAuthChallenge byte = 0x0E
  29. opAuthResponse byte = 0x0F
  30. opAuthSuccess byte = 0x10
  31. resultKindVoid = 1
  32. resultKindRows = 2
  33. resultKindKeyspace = 3
  34. resultKindPrepared = 4
  35. resultKindSchemaChanged = 5
  36. flagQueryValues uint8 = 1
  37. flagCompress uint8 = 1
  38. flagTrace uint8 = 2
  39. flagPageSize uint8 = 4
  40. flagPageState uint8 = 8
  41. flagHasMore uint8 = 2
  42. apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
  43. )
  44. var headerProtoSize = [...]int{
  45. protoVersion1: 8,
  46. protoVersion2: 8,
  47. protoVersion3: 9,
  48. }
  49. // TODO: replace with a struct which has a header and a body buffer,
  50. // header just has methods like, set/get the options in its backing array
  51. // then in a writeTo we write the header then the body.
  52. type frame []byte
  53. func newFrame(version uint8) frame {
  54. // TODO: pool these at the session level incase anyone is using different
  55. // clusters with different versions in the same application.
  56. return make(frame, headerProtoSize[version], defaultFrameSize)
  57. }
  58. func (f *frame) writeInt(v int32) {
  59. p := f.grow(4)
  60. (*f)[p] = byte(v >> 24)
  61. (*f)[p+1] = byte(v >> 16)
  62. (*f)[p+2] = byte(v >> 8)
  63. (*f)[p+3] = byte(v)
  64. }
  65. func (f *frame) writeShort(v uint16) {
  66. p := f.grow(2)
  67. (*f)[p] = byte(v >> 8)
  68. (*f)[p+1] = byte(v)
  69. }
  70. func (f *frame) writeString(v string) {
  71. f.writeShort(uint16(len(v)))
  72. p := f.grow(len(v))
  73. copy((*f)[p:], v)
  74. }
  75. func (f *frame) writeLongString(v string) {
  76. f.writeInt(int32(len(v)))
  77. p := f.grow(len(v))
  78. copy((*f)[p:], v)
  79. }
  80. func (f *frame) writeUUID() {
  81. }
  82. func (f *frame) writeStringList(v []string) {
  83. f.writeShort(uint16(len(v)))
  84. for i := range v {
  85. f.writeString(v[i])
  86. }
  87. }
  88. func (f *frame) writeByte(v byte) {
  89. p := f.grow(1)
  90. (*f)[p] = v
  91. }
  92. func (f *frame) writeBytes(v []byte) {
  93. if v == nil {
  94. f.writeInt(-1)
  95. return
  96. }
  97. f.writeInt(int32(len(v)))
  98. p := f.grow(len(v))
  99. copy((*f)[p:], v)
  100. }
  101. func (f *frame) writeShortBytes(v []byte) {
  102. f.writeShort(uint16(len(v)))
  103. p := f.grow(len(v))
  104. copy((*f)[p:], v)
  105. }
  106. func (f *frame) writeInet(ip net.IP, port int) {
  107. p := f.grow(1 + len(ip))
  108. (*f)[p] = byte(len(ip))
  109. copy((*f)[p+1:], ip)
  110. f.writeInt(int32(port))
  111. }
  112. func (f *frame) writeStringMap(v map[string]string) {
  113. f.writeShort(uint16(len(v)))
  114. for key, value := range v {
  115. f.writeString(key)
  116. f.writeString(value)
  117. }
  118. }
  119. func (f *frame) writeStringMultimap(v map[string][]string) {
  120. f.writeShort(uint16(len(v)))
  121. for key, values := range v {
  122. f.writeString(key)
  123. f.writeStringList(values)
  124. }
  125. }
  126. func (f *frame) setHeader(version, flags uint8, stream int, opcode uint8) {
  127. (*f)[0] = version
  128. (*f)[1] = flags
  129. p := 2
  130. if version&maskVersion > protoVersion2 {
  131. (*f)[2] = byte(stream >> 8)
  132. (*f)[3] = byte(stream)
  133. p += 2
  134. } else {
  135. (*f)[2] = byte(stream & 0xFF)
  136. p++
  137. }
  138. (*f)[p] = opcode
  139. }
  140. func (f *frame) setStream(stream int, version uint8) {
  141. if version > protoVersion2 {
  142. (*f)[2] = byte(stream >> 8)
  143. (*f)[3] = byte(stream)
  144. } else {
  145. (*f)[2] = byte(stream)
  146. }
  147. }
  148. func (f *frame) Stream(version uint8) (n int) {
  149. if version > protoVersion2 {
  150. n = int((*f)[2])<<8 | int((*f)[3])
  151. } else {
  152. n = int((*f)[2])
  153. }
  154. return
  155. }
  156. func (f *frame) setLength(length int, version uint8) {
  157. p := 4
  158. if version > protoVersion2 {
  159. p = 5
  160. }
  161. (*f)[p] = byte(length >> 24)
  162. (*f)[p+1] = byte(length >> 16)
  163. (*f)[p+2] = byte(length >> 8)
  164. (*f)[p+3] = byte(length)
  165. }
  166. func (f *frame) Op(version uint8) byte {
  167. if version > protoVersion2 {
  168. return (*f)[4]
  169. } else {
  170. return (*f)[3]
  171. }
  172. }
  173. func (f *frame) Length(version uint8) int {
  174. p := 4
  175. if version > protoVersion2 {
  176. p = 5
  177. }
  178. return int((*f)[p])<<24 | int((*f)[p+1])<<16 | int((*f)[p+2])<<8 | int((*f)[p+3])
  179. }
  180. func (f *frame) grow(n int) int {
  181. if len(*f)+n >= cap(*f) {
  182. buf := make(frame, len(*f), len(*f)*2+n)
  183. copy(buf, *f)
  184. *f = buf
  185. }
  186. p := len(*f)
  187. *f = (*f)[:p+n]
  188. return p
  189. }
  190. func (f *frame) skipHeader(version uint8) {
  191. *f = (*f)[headerProtoSize[version]:]
  192. }
  193. func (f *frame) readInt() int {
  194. if len(*f) < 4 {
  195. panic(NewErrProtocol("Trying to read an int while <4 bytes in the buffer"))
  196. }
  197. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  198. *f = (*f)[4:]
  199. return int(int32(v))
  200. }
  201. func (f *frame) readShort() uint16 {
  202. if len(*f) < 2 {
  203. panic(NewErrProtocol("Trying to read a short while <2 bytes in the buffer"))
  204. }
  205. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  206. *f = (*f)[2:]
  207. return v
  208. }
  209. func (f *frame) readString() string {
  210. n := int(f.readShort())
  211. if len(*f) < n {
  212. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  213. }
  214. v := string((*f)[:n])
  215. *f = (*f)[n:]
  216. return v
  217. }
  218. func (f *frame) readLongString() string {
  219. n := f.readInt()
  220. if len(*f) < n {
  221. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  222. }
  223. v := string((*f)[:n])
  224. *f = (*f)[n:]
  225. return v
  226. }
  227. func (f *frame) readBytes() []byte {
  228. n := f.readInt()
  229. if n < 0 {
  230. return nil
  231. }
  232. if len(*f) < n {
  233. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  234. }
  235. v := (*f)[:n]
  236. *f = (*f)[n:]
  237. return v
  238. }
  239. func (f *frame) readShortBytes() []byte {
  240. n := int(f.readShort())
  241. if len(*f) < n {
  242. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  243. }
  244. v := (*f)[:n]
  245. *f = (*f)[n:]
  246. return v
  247. }
  248. func (f *frame) readTypeInfo(version uint8) *TypeInfo {
  249. x := f.readShort()
  250. typ := &TypeInfo{
  251. Proto: version,
  252. Type: Type(x),
  253. }
  254. switch typ.Type {
  255. case TypeCustom:
  256. typ.Custom = f.readString()
  257. if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
  258. typ = &TypeInfo{Type: cassType}
  259. switch typ.Type {
  260. case TypeMap:
  261. typ.Key = f.readTypeInfo(version)
  262. fallthrough
  263. case TypeList, TypeSet:
  264. typ.Elem = f.readTypeInfo(version)
  265. }
  266. }
  267. case TypeMap:
  268. typ.Key = f.readTypeInfo(version)
  269. fallthrough
  270. case TypeList, TypeSet:
  271. typ.Elem = f.readTypeInfo(version)
  272. }
  273. return typ
  274. }
  275. func (f *frame) readMetaData(version uint8) ([]ColumnInfo, []byte) {
  276. flags := f.readInt()
  277. numColumns := f.readInt()
  278. var pageState []byte
  279. if flags&2 != 0 {
  280. pageState = f.readBytes()
  281. }
  282. globalKeyspace := ""
  283. globalTable := ""
  284. if flags&1 != 0 {
  285. globalKeyspace = f.readString()
  286. globalTable = f.readString()
  287. }
  288. columns := make([]ColumnInfo, numColumns)
  289. for i := 0; i < numColumns; i++ {
  290. columns[i].Keyspace = globalKeyspace
  291. columns[i].Table = globalTable
  292. if flags&1 == 0 {
  293. columns[i].Keyspace = f.readString()
  294. columns[i].Table = f.readString()
  295. }
  296. columns[i].Name = f.readString()
  297. columns[i].TypeInfo = f.readTypeInfo(version)
  298. }
  299. return columns, pageState
  300. }
  301. func (f *frame) readError() RequestError {
  302. code := f.readInt()
  303. msg := f.readString()
  304. errD := errorFrame{code, msg}
  305. switch code {
  306. case errUnavailable:
  307. cl := Consistency(f.readShort())
  308. required := f.readInt()
  309. alive := f.readInt()
  310. return RequestErrUnavailable{errorFrame: errD,
  311. Consistency: cl,
  312. Required: required,
  313. Alive: alive}
  314. case errWriteTimeout:
  315. cl := Consistency(f.readShort())
  316. received := f.readInt()
  317. blockfor := f.readInt()
  318. writeType := f.readString()
  319. return RequestErrWriteTimeout{errorFrame: errD,
  320. Consistency: cl,
  321. Received: received,
  322. BlockFor: blockfor,
  323. WriteType: writeType,
  324. }
  325. case errReadTimeout:
  326. cl := Consistency(f.readShort())
  327. received := f.readInt()
  328. blockfor := f.readInt()
  329. dataPresent := (*f)[0]
  330. *f = (*f)[1:]
  331. return RequestErrReadTimeout{errorFrame: errD,
  332. Consistency: cl,
  333. Received: received,
  334. BlockFor: blockfor,
  335. DataPresent: dataPresent,
  336. }
  337. case errAlreadyExists:
  338. ks := f.readString()
  339. table := f.readString()
  340. return RequestErrAlreadyExists{errorFrame: errD,
  341. Keyspace: ks,
  342. Table: table,
  343. }
  344. case errUnprepared:
  345. stmtId := f.readShortBytes()
  346. return RequestErrUnprepared{errorFrame: errD,
  347. StatementId: stmtId,
  348. }
  349. default:
  350. return errD
  351. }
  352. }
  353. func (f *frame) writeConsistency(c Consistency) {
  354. f.writeShort(consistencyCodes[c])
  355. }
  356. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  357. return f, nil
  358. }
  359. var consistencyCodes = []uint16{
  360. Any: 0x0000,
  361. One: 0x0001,
  362. Two: 0x0002,
  363. Three: 0x0003,
  364. Quorum: 0x0004,
  365. All: 0x0005,
  366. LocalQuorum: 0x0006,
  367. EachQuorum: 0x0007,
  368. Serial: 0x0008,
  369. LocalSerial: 0x0009,
  370. LocalOne: 0x000A,
  371. }
  372. type readyFrame struct{}
  373. type supportedFrame struct{}
  374. type resultVoidFrame struct{}
  375. type resultRowsFrame struct {
  376. Columns []ColumnInfo
  377. Rows [][][]byte
  378. PagingState []byte
  379. }
  380. type resultKeyspaceFrame struct {
  381. Keyspace string
  382. }
  383. type resultPreparedFrame struct {
  384. PreparedId []byte
  385. Arguments []ColumnInfo
  386. ReturnValues []ColumnInfo
  387. }
  388. type operation interface {
  389. encodeFrame(version uint8, dst frame) (frame, error)
  390. }
  391. type startupFrame struct {
  392. CQLVersion string
  393. Compression string
  394. }
  395. func (op *startupFrame) String() string {
  396. return fmt.Sprintf("[startup cqlversion=%q compression=%q]", op.CQLVersion, op.Compression)
  397. }
  398. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  399. if f == nil {
  400. f = newFrame(version)
  401. }
  402. f.setHeader(version, 0, 0, opStartup)
  403. // TODO: fix this, this is actually a StringMap
  404. var size uint16 = 1
  405. if op.Compression != "" {
  406. size++
  407. }
  408. f.writeShort(size)
  409. f.writeString("CQL_VERSION")
  410. f.writeString(op.CQLVersion)
  411. if op.Compression != "" {
  412. f.writeString("COMPRESSION")
  413. f.writeString(op.Compression)
  414. }
  415. return f, nil
  416. }
  417. type queryFrame struct {
  418. Stmt string
  419. Prepared []byte
  420. Cons Consistency
  421. Values [][]byte
  422. PageSize int
  423. PageState []byte
  424. }
  425. func (op *queryFrame) String() string {
  426. return fmt.Sprintf("[query statement=%q prepared=%x cons=%v ...]", op.Stmt, op.Prepared, op.Cons)
  427. }
  428. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  429. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  430. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  431. return nil, ErrUnsupported
  432. }
  433. if f == nil {
  434. f = newFrame(version)
  435. }
  436. if len(op.Prepared) > 0 {
  437. f.setHeader(version, 0, 0, opExecute)
  438. f.writeShortBytes(op.Prepared)
  439. } else {
  440. f.setHeader(version, 0, 0, opQuery)
  441. f.writeLongString(op.Stmt)
  442. }
  443. if version >= 2 {
  444. f.writeConsistency(op.Cons)
  445. flagPos := len(f)
  446. f.writeByte(0)
  447. if len(op.Values) > 0 {
  448. f[flagPos] |= flagQueryValues
  449. f.writeShort(uint16(len(op.Values)))
  450. for _, value := range op.Values {
  451. f.writeBytes(value)
  452. }
  453. }
  454. if op.PageSize > 0 {
  455. f[flagPos] |= flagPageSize
  456. f.writeInt(int32(op.PageSize))
  457. }
  458. if len(op.PageState) > 0 {
  459. f[flagPos] |= flagPageState
  460. f.writeBytes(op.PageState)
  461. }
  462. } else if version == 1 {
  463. if len(op.Prepared) > 0 {
  464. f.writeShort(uint16(len(op.Values)))
  465. for _, value := range op.Values {
  466. f.writeBytes(value)
  467. }
  468. }
  469. f.writeConsistency(op.Cons)
  470. }
  471. return f, nil
  472. }
  473. type prepareFrame struct {
  474. Stmt string
  475. }
  476. func (op *prepareFrame) String() string {
  477. return fmt.Sprintf("[prepare statement=%q]", op.Stmt)
  478. }
  479. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  480. if f == nil {
  481. f = newFrame(version)
  482. }
  483. f.setHeader(version, 0, 0, opPrepare)
  484. f.writeLongString(op.Stmt)
  485. return f, nil
  486. }
  487. type optionsFrame struct{}
  488. func (op *optionsFrame) String() string {
  489. return "[options]"
  490. }
  491. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  492. if f == nil {
  493. f = newFrame(version)
  494. }
  495. f.setHeader(version, 0, 0, opOptions)
  496. return f, nil
  497. }
  498. type authenticateFrame struct {
  499. Authenticator string
  500. }
  501. func (op *authenticateFrame) String() string {
  502. return fmt.Sprintf("[authenticate authenticator=%q]", op.Authenticator)
  503. }
  504. type authResponseFrame struct {
  505. Data []byte
  506. }
  507. func (op *authResponseFrame) String() string {
  508. return fmt.Sprintf("[auth_response data=%q]", op.Data)
  509. }
  510. func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
  511. if f == nil {
  512. f = newFrame(version)
  513. }
  514. f.setHeader(version, 0, 0, opAuthResponse)
  515. f.writeBytes(op.Data)
  516. return f, nil
  517. }
  518. type authSuccessFrame struct {
  519. Data []byte
  520. }
  521. func (op *authSuccessFrame) String() string {
  522. return fmt.Sprintf("[auth_success data=%q]", op.Data)
  523. }
  524. type authChallengeFrame struct {
  525. Data []byte
  526. }
  527. func (op *authChallengeFrame) String() string {
  528. return fmt.Sprintf("[auth_challenge data=%q]", op.Data)
  529. }