frame.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. )
  8. const (
  9. protoRequest byte = 0x02
  10. protoResponse byte = 0x82
  11. opError byte = 0x00
  12. opStartup byte = 0x01
  13. opReady byte = 0x02
  14. opAuthenticate byte = 0x03
  15. opOptions byte = 0x05
  16. opSupported byte = 0x06
  17. opQuery byte = 0x07
  18. opResult byte = 0x08
  19. opPrepare byte = 0x09
  20. opExecute byte = 0x0A
  21. opRegister byte = 0x0B
  22. opEvent byte = 0x0C
  23. opBatch byte = 0x0D
  24. opAuthChallenge byte = 0x0E
  25. opAuthResponse byte = 0x0F
  26. opAuthSuccess byte = 0x10
  27. resultKindVoid = 1
  28. resultKindRows = 2
  29. resultKindKeyspace = 3
  30. resultKindPrepared = 4
  31. resultKindSchemaChanged = 5
  32. flagQueryValues uint8 = 1
  33. flagCompress uint8 = 1
  34. flagTrace uint8 = 2
  35. flagPageSize uint8 = 4
  36. flagPageState uint8 = 8
  37. flagHasMore uint8 = 2
  38. errServer = 0x0000
  39. errProtocol = 0x000A
  40. errCredentials = 0x0100
  41. errUnavailable = 0x1000
  42. errOverloaded = 0x1001
  43. errBootstrapping = 0x1002
  44. errTruncate = 0x1003
  45. errWriteTimeout = 0x1100
  46. errReadTimeout = 0x1200
  47. errSyntax = 0x2000
  48. errUnauthorized = 0x2100
  49. errInvalid = 0x2200
  50. errConfig = 0x2300
  51. errAlreadyExists = 0x2400
  52. errUnprepared = 0x2500
  53. headerSize = 8
  54. apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
  55. )
  56. type frame []byte
  57. func (f *frame) writeInt(v int32) {
  58. p := f.grow(4)
  59. (*f)[p] = byte(v >> 24)
  60. (*f)[p+1] = byte(v >> 16)
  61. (*f)[p+2] = byte(v >> 8)
  62. (*f)[p+3] = byte(v)
  63. }
  64. func (f *frame) writeShort(v uint16) {
  65. p := f.grow(2)
  66. (*f)[p] = byte(v >> 8)
  67. (*f)[p+1] = byte(v)
  68. }
  69. func (f *frame) writeString(v string) {
  70. f.writeShort(uint16(len(v)))
  71. p := f.grow(len(v))
  72. copy((*f)[p:], v)
  73. }
  74. func (f *frame) writeLongString(v string) {
  75. f.writeInt(int32(len(v)))
  76. p := f.grow(len(v))
  77. copy((*f)[p:], v)
  78. }
  79. func (f *frame) writeUUID() {
  80. }
  81. func (f *frame) writeStringList(v []string) {
  82. f.writeShort(uint16(len(v)))
  83. for i := range v {
  84. f.writeString(v[i])
  85. }
  86. }
  87. func (f *frame) writeByte(v byte) {
  88. p := f.grow(1)
  89. (*f)[p] = v
  90. }
  91. func (f *frame) writeBytes(v []byte) {
  92. if v == nil {
  93. f.writeInt(-1)
  94. return
  95. }
  96. f.writeInt(int32(len(v)))
  97. p := f.grow(len(v))
  98. copy((*f)[p:], v)
  99. }
  100. func (f *frame) writeShortBytes(v []byte) {
  101. f.writeShort(uint16(len(v)))
  102. p := f.grow(len(v))
  103. copy((*f)[p:], v)
  104. }
  105. func (f *frame) writeInet(ip net.IP, port int) {
  106. p := f.grow(1 + len(ip))
  107. (*f)[p] = byte(len(ip))
  108. copy((*f)[p+1:], ip)
  109. f.writeInt(int32(port))
  110. }
  111. func (f *frame) writeStringMap(v map[string]string) {
  112. f.writeShort(uint16(len(v)))
  113. for key, value := range v {
  114. f.writeString(key)
  115. f.writeString(value)
  116. }
  117. }
  118. func (f *frame) writeStringMultimap(v map[string][]string) {
  119. f.writeShort(uint16(len(v)))
  120. for key, values := range v {
  121. f.writeString(key)
  122. f.writeStringList(values)
  123. }
  124. }
  125. func (f *frame) setHeader(version, flags, stream, opcode uint8) {
  126. (*f)[0] = version
  127. (*f)[1] = flags
  128. (*f)[2] = stream
  129. (*f)[3] = opcode
  130. }
  131. func (f *frame) setLength(length int) {
  132. (*f)[4] = byte(length >> 24)
  133. (*f)[5] = byte(length >> 16)
  134. (*f)[6] = byte(length >> 8)
  135. (*f)[7] = byte(length)
  136. }
  137. func (f *frame) Length() int {
  138. return int((*f)[4])<<24 | int((*f)[5])<<16 | int((*f)[6])<<8 | int((*f)[7])
  139. }
  140. func (f *frame) grow(n int) int {
  141. if len(*f)+n >= cap(*f) {
  142. buf := make(frame, len(*f), len(*f)*2+n)
  143. copy(buf, *f)
  144. *f = buf
  145. }
  146. p := len(*f)
  147. *f = (*f)[:p+n]
  148. return p
  149. }
  150. func (f *frame) skipHeader() {
  151. *f = (*f)[headerSize:]
  152. }
  153. func (f *frame) readInt() int {
  154. if len(*f) < 4 {
  155. panic(NewErrProtocol("Trying to read an int while >4 bytes in the buffer"))
  156. }
  157. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  158. *f = (*f)[4:]
  159. return int(int32(v))
  160. }
  161. func (f *frame) readShort() uint16 {
  162. if len(*f) < 2 {
  163. panic(NewErrProtocol("Trying to read a short while >2 bytes in the buffer"))
  164. }
  165. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  166. *f = (*f)[2:]
  167. return v
  168. }
  169. func (f *frame) readString() string {
  170. n := int(f.readShort())
  171. if len(*f) < n {
  172. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  173. }
  174. v := string((*f)[:n])
  175. *f = (*f)[n:]
  176. return v
  177. }
  178. func (f *frame) readLongString() string {
  179. n := f.readInt()
  180. if len(*f) < n {
  181. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  182. }
  183. v := string((*f)[:n])
  184. *f = (*f)[n:]
  185. return v
  186. }
  187. func (f *frame) readBytes() []byte {
  188. n := f.readInt()
  189. if n < 0 {
  190. return nil
  191. }
  192. if len(*f) < n {
  193. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  194. }
  195. v := (*f)[:n]
  196. *f = (*f)[n:]
  197. return v
  198. }
  199. func (f *frame) readShortBytes() []byte {
  200. n := int(f.readShort())
  201. if len(*f) < n {
  202. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  203. }
  204. v := (*f)[:n]
  205. *f = (*f)[n:]
  206. return v
  207. }
  208. func (f *frame) readTypeInfo() *TypeInfo {
  209. x := f.readShort()
  210. typ := &TypeInfo{Type: Type(x)}
  211. switch typ.Type {
  212. case TypeCustom:
  213. typ.Custom = f.readString()
  214. if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
  215. typ = &TypeInfo{Type: cassType}
  216. switch typ.Type {
  217. case TypeMap:
  218. typ.Key = f.readTypeInfo()
  219. fallthrough
  220. case TypeList, TypeSet:
  221. typ.Elem = f.readTypeInfo()
  222. }
  223. }
  224. case TypeMap:
  225. typ.Key = f.readTypeInfo()
  226. fallthrough
  227. case TypeList, TypeSet:
  228. typ.Elem = f.readTypeInfo()
  229. }
  230. return typ
  231. }
  232. func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
  233. flags := f.readInt()
  234. numColumns := f.readInt()
  235. var pageState []byte
  236. if flags&2 != 0 {
  237. pageState = f.readBytes()
  238. }
  239. globalKeyspace := ""
  240. globalTable := ""
  241. if flags&1 != 0 {
  242. globalKeyspace = f.readString()
  243. globalTable = f.readString()
  244. }
  245. columns := make([]ColumnInfo, numColumns)
  246. for i := 0; i < numColumns; i++ {
  247. columns[i].Keyspace = globalKeyspace
  248. columns[i].Table = globalTable
  249. if flags&1 == 0 {
  250. columns[i].Keyspace = f.readString()
  251. columns[i].Table = f.readString()
  252. }
  253. columns[i].Name = f.readString()
  254. columns[i].TypeInfo = f.readTypeInfo()
  255. }
  256. return columns, pageState
  257. }
  258. func (f *frame) writeConsistency(c Consistency) {
  259. f.writeShort(consistencyCodes[c])
  260. }
  261. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  262. return f, nil
  263. }
  264. var consistencyCodes = []uint16{
  265. Any: 0x0000,
  266. One: 0x0001,
  267. Two: 0x0002,
  268. Three: 0x0003,
  269. Quorum: 0x0004,
  270. All: 0x0005,
  271. LocalQuorum: 0x0006,
  272. EachQuorum: 0x0007,
  273. Serial: 0x0008,
  274. LocalSerial: 0x0009,
  275. }
  276. type readyFrame struct{}
  277. type supportedFrame struct{}
  278. type resultVoidFrame struct{}
  279. type resultRowsFrame struct {
  280. Columns []ColumnInfo
  281. Rows [][][]byte
  282. PagingState []byte
  283. }
  284. type resultKeyspaceFrame struct {
  285. Keyspace string
  286. }
  287. type resultPreparedFrame struct {
  288. PreparedId []byte
  289. Values []ColumnInfo
  290. }
  291. type errorFrame struct {
  292. Code int
  293. Message string
  294. }
  295. func (e errorFrame) Error() string {
  296. return e.Message
  297. }
  298. type operation interface {
  299. encodeFrame(version uint8, dst frame) (frame, error)
  300. }
  301. type startupFrame struct {
  302. CQLVersion string
  303. Compression string
  304. }
  305. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  306. if f == nil {
  307. f = make(frame, headerSize, defaultFrameSize)
  308. }
  309. f.setHeader(version, 0, 0, opStartup)
  310. f.writeShort(1)
  311. f.writeString("CQL_VERSION")
  312. f.writeString(op.CQLVersion)
  313. if op.Compression != "" {
  314. f[headerSize+1] += 1
  315. f.writeString("COMPRESSION")
  316. f.writeString(op.Compression)
  317. }
  318. return f, nil
  319. }
  320. type queryFrame struct {
  321. Stmt string
  322. Prepared []byte
  323. Cons Consistency
  324. Values [][]byte
  325. PageSize int
  326. PageState []byte
  327. }
  328. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  329. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  330. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  331. return nil, ErrUnsupported
  332. }
  333. if f == nil {
  334. f = make(frame, headerSize, defaultFrameSize)
  335. }
  336. if len(op.Prepared) > 0 {
  337. f.setHeader(version, 0, 0, opExecute)
  338. f.writeShortBytes(op.Prepared)
  339. } else {
  340. f.setHeader(version, 0, 0, opQuery)
  341. f.writeLongString(op.Stmt)
  342. }
  343. if version >= 2 {
  344. f.writeConsistency(op.Cons)
  345. flagPos := len(f)
  346. f.writeByte(0)
  347. if len(op.Values) > 0 {
  348. f[flagPos] |= flagQueryValues
  349. f.writeShort(uint16(len(op.Values)))
  350. for _, value := range op.Values {
  351. f.writeBytes(value)
  352. }
  353. }
  354. if op.PageSize > 0 {
  355. f[flagPos] |= flagPageSize
  356. f.writeInt(int32(op.PageSize))
  357. }
  358. if len(op.PageState) > 0 {
  359. f[flagPos] |= flagPageState
  360. f.writeBytes(op.PageState)
  361. }
  362. } else if version == 1 {
  363. if len(op.Prepared) > 0 {
  364. f.writeShort(uint16(len(op.Values)))
  365. for _, value := range op.Values {
  366. f.writeBytes(value)
  367. }
  368. }
  369. f.writeConsistency(op.Cons)
  370. }
  371. return f, nil
  372. }
  373. type prepareFrame struct {
  374. Stmt string
  375. }
  376. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  377. if f == nil {
  378. f = make(frame, headerSize, defaultFrameSize)
  379. }
  380. f.setHeader(version, 0, 0, opPrepare)
  381. f.writeLongString(op.Stmt)
  382. return f, nil
  383. }
  384. type optionsFrame struct{}
  385. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  386. if f == nil {
  387. f = make(frame, headerSize, defaultFrameSize)
  388. }
  389. f.setHeader(version, 0, 0, opOptions)
  390. return f, nil
  391. }
  392. type authenticateFrame struct {
  393. Authenticator string
  394. }
  395. type authResponseFrame struct {
  396. Data []byte
  397. }
  398. func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
  399. if f == nil {
  400. f = make(frame, headerSize, defaultFrameSize)
  401. }
  402. f.setHeader(version, 0, 0, opAuthResponse)
  403. f.writeBytes(op.Data)
  404. return f, nil
  405. }
  406. type authSuccessFrame struct {
  407. Data []byte
  408. }
  409. type authChallengeFrame struct {
  410. Data []byte
  411. }