frame.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. )
  8. const (
  9. protoRequest byte = 0x02
  10. protoResponse byte = 0x82
  11. opError byte = 0x00
  12. opStartup byte = 0x01
  13. opReady byte = 0x02
  14. opAuthenticate byte = 0x03
  15. opOptions byte = 0x05
  16. opSupported byte = 0x06
  17. opQuery byte = 0x07
  18. opResult byte = 0x08
  19. opPrepare byte = 0x09
  20. opExecute byte = 0x0A
  21. opRegister byte = 0x0B
  22. opEvent byte = 0x0C
  23. opBatch byte = 0x0D
  24. opAuthChallenge byte = 0x0E
  25. opAuthResponse byte = 0x0F
  26. opAuthSuccess byte = 0x10
  27. resultKindVoid = 1
  28. resultKindRows = 2
  29. resultKindKeyspace = 3
  30. resultKindPrepared = 4
  31. resultKindSchemaChanged = 5
  32. flagQueryValues uint8 = 1
  33. flagCompress uint8 = 1
  34. flagTrace uint8 = 2
  35. flagPageSize uint8 = 4
  36. flagPageState uint8 = 8
  37. flagHasMore uint8 = 2
  38. headerSize = 8
  39. )
  40. type frame []byte
  41. func (f *frame) writeInt(v int32) {
  42. p := f.grow(4)
  43. (*f)[p] = byte(v >> 24)
  44. (*f)[p+1] = byte(v >> 16)
  45. (*f)[p+2] = byte(v >> 8)
  46. (*f)[p+3] = byte(v)
  47. }
  48. func (f *frame) writeShort(v uint16) {
  49. p := f.grow(2)
  50. (*f)[p] = byte(v >> 8)
  51. (*f)[p+1] = byte(v)
  52. }
  53. func (f *frame) writeString(v string) {
  54. f.writeShort(uint16(len(v)))
  55. p := f.grow(len(v))
  56. copy((*f)[p:], v)
  57. }
  58. func (f *frame) writeLongString(v string) {
  59. f.writeInt(int32(len(v)))
  60. p := f.grow(len(v))
  61. copy((*f)[p:], v)
  62. }
  63. func (f *frame) writeUUID() {
  64. }
  65. func (f *frame) writeStringList(v []string) {
  66. f.writeShort(uint16(len(v)))
  67. for i := range v {
  68. f.writeString(v[i])
  69. }
  70. }
  71. func (f *frame) writeByte(v byte) {
  72. p := f.grow(1)
  73. (*f)[p] = v
  74. }
  75. func (f *frame) writeBytes(v []byte) {
  76. if v == nil {
  77. f.writeInt(-1)
  78. return
  79. }
  80. f.writeInt(int32(len(v)))
  81. p := f.grow(len(v))
  82. copy((*f)[p:], v)
  83. }
  84. func (f *frame) writeShortBytes(v []byte) {
  85. f.writeShort(uint16(len(v)))
  86. p := f.grow(len(v))
  87. copy((*f)[p:], v)
  88. }
  89. func (f *frame) writeInet(ip net.IP, port int) {
  90. p := f.grow(1 + len(ip))
  91. (*f)[p] = byte(len(ip))
  92. copy((*f)[p+1:], ip)
  93. f.writeInt(int32(port))
  94. }
  95. func (f *frame) writeStringMap(v map[string]string) {
  96. f.writeShort(uint16(len(v)))
  97. for key, value := range v {
  98. f.writeString(key)
  99. f.writeString(value)
  100. }
  101. }
  102. func (f *frame) writeStringMultimap(v map[string][]string) {
  103. f.writeShort(uint16(len(v)))
  104. for key, values := range v {
  105. f.writeString(key)
  106. f.writeStringList(values)
  107. }
  108. }
  109. func (f *frame) setHeader(version, flags, stream, opcode uint8) {
  110. (*f)[0] = version
  111. (*f)[1] = flags
  112. (*f)[2] = stream
  113. (*f)[3] = opcode
  114. }
  115. func (f *frame) setLength(length int) {
  116. (*f)[4] = byte(length >> 24)
  117. (*f)[5] = byte(length >> 16)
  118. (*f)[6] = byte(length >> 8)
  119. (*f)[7] = byte(length)
  120. }
  121. func (f *frame) Length() int {
  122. return int((*f)[4])<<24 | int((*f)[5])<<16 | int((*f)[6])<<8 | int((*f)[7])
  123. }
  124. func (f *frame) grow(n int) int {
  125. if len(*f)+n >= cap(*f) {
  126. buf := make(frame, len(*f), len(*f)*2+n)
  127. copy(buf, *f)
  128. *f = buf
  129. }
  130. p := len(*f)
  131. *f = (*f)[:p+n]
  132. return p
  133. }
  134. func (f *frame) skipHeader() {
  135. *f = (*f)[headerSize:]
  136. }
  137. func (f *frame) readInt() int {
  138. if len(*f) < 4 {
  139. panic(ErrProtocol)
  140. }
  141. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  142. *f = (*f)[4:]
  143. return int(int32(v))
  144. }
  145. func (f *frame) readShort() uint16 {
  146. if len(*f) < 2 {
  147. panic(ErrProtocol)
  148. }
  149. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  150. *f = (*f)[2:]
  151. return v
  152. }
  153. func (f *frame) readString() string {
  154. n := int(f.readShort())
  155. if len(*f) < n {
  156. panic(ErrProtocol)
  157. }
  158. v := string((*f)[:n])
  159. *f = (*f)[n:]
  160. return v
  161. }
  162. func (f *frame) readLongString() string {
  163. n := f.readInt()
  164. if len(*f) < n {
  165. panic(ErrProtocol)
  166. }
  167. v := string((*f)[:n])
  168. *f = (*f)[n:]
  169. return v
  170. }
  171. func (f *frame) readBytes() []byte {
  172. n := f.readInt()
  173. if n < 0 {
  174. return nil
  175. }
  176. if len(*f) < n {
  177. panic(ErrProtocol)
  178. }
  179. v := (*f)[:n]
  180. *f = (*f)[n:]
  181. return v
  182. }
  183. func (f *frame) readShortBytes() []byte {
  184. n := int(f.readShort())
  185. if len(*f) < n {
  186. panic(ErrProtocol)
  187. }
  188. v := (*f)[:n]
  189. *f = (*f)[n:]
  190. return v
  191. }
  192. func (f *frame) readTypeInfo() *TypeInfo {
  193. x := f.readShort()
  194. typ := &TypeInfo{Type: Type(x)}
  195. switch typ.Type {
  196. case TypeCustom:
  197. typ.Custom = f.readString()
  198. case TypeMap:
  199. typ.Key = f.readTypeInfo()
  200. fallthrough
  201. case TypeList, TypeSet:
  202. typ.Elem = f.readTypeInfo()
  203. }
  204. return typ
  205. }
  206. func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
  207. flags := f.readInt()
  208. numColumns := f.readInt()
  209. var pageState []byte
  210. if flags&2 != 0 {
  211. pageState = f.readBytes()
  212. }
  213. globalKeyspace := ""
  214. globalTable := ""
  215. if flags&1 != 0 {
  216. globalKeyspace = f.readString()
  217. globalTable = f.readString()
  218. }
  219. columns := make([]ColumnInfo, numColumns)
  220. for i := 0; i < numColumns; i++ {
  221. columns[i].Keyspace = globalKeyspace
  222. columns[i].Table = globalTable
  223. if flags&1 == 0 {
  224. columns[i].Keyspace = f.readString()
  225. columns[i].Table = f.readString()
  226. }
  227. columns[i].Name = f.readString()
  228. columns[i].TypeInfo = f.readTypeInfo()
  229. }
  230. return columns, pageState
  231. }
  232. func (f *frame) writeConsistency(c Consistency) {
  233. f.writeShort(consistencyCodes[c])
  234. }
  235. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  236. return f, nil
  237. }
  238. var consistencyCodes = []uint16{
  239. Any: 0x0000,
  240. One: 0x0001,
  241. Two: 0x0002,
  242. Three: 0x0003,
  243. Quorum: 0x0004,
  244. All: 0x0005,
  245. LocalQuorum: 0x0006,
  246. EachQuorum: 0x0007,
  247. Serial: 0x0008,
  248. LocalSerial: 0x0009,
  249. }
  250. type readyFrame struct{}
  251. type supportedFrame struct{}
  252. type resultVoidFrame struct{}
  253. type resultRowsFrame struct {
  254. Columns []ColumnInfo
  255. Rows [][][]byte
  256. PagingState []byte
  257. }
  258. type resultKeyspaceFrame struct {
  259. Keyspace string
  260. }
  261. type resultPreparedFrame struct {
  262. PreparedId []byte
  263. Values []ColumnInfo
  264. }
  265. type errorFrame struct {
  266. Code int
  267. Message string
  268. }
  269. func (e errorFrame) Error() string {
  270. return e.Message
  271. }
  272. type operation interface {
  273. encodeFrame(version uint8, dst frame) (frame, error)
  274. }
  275. type startupFrame struct {
  276. CQLVersion string
  277. Compression string
  278. }
  279. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  280. if f == nil {
  281. f = make(frame, headerSize, defaultFrameSize)
  282. }
  283. f.setHeader(version, 0, 0, opStartup)
  284. f.writeShort(1)
  285. f.writeString("CQL_VERSION")
  286. f.writeString(op.CQLVersion)
  287. if op.Compression != "" {
  288. f[headerSize+1] += 1
  289. f.writeString("COMPRESSION")
  290. f.writeString(op.Compression)
  291. }
  292. return f, nil
  293. }
  294. type queryFrame struct {
  295. Stmt string
  296. Prepared []byte
  297. Cons Consistency
  298. Values [][]byte
  299. PageSize int
  300. PageState []byte
  301. }
  302. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  303. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  304. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  305. return nil, ErrUnsupported
  306. }
  307. if f == nil {
  308. f = make(frame, headerSize, defaultFrameSize)
  309. }
  310. if len(op.Prepared) > 0 {
  311. f.setHeader(version, 0, 0, opExecute)
  312. f.writeShortBytes(op.Prepared)
  313. } else {
  314. f.setHeader(version, 0, 0, opQuery)
  315. f.writeLongString(op.Stmt)
  316. }
  317. if version >= 2 {
  318. f.writeConsistency(op.Cons)
  319. flagPos := len(f)
  320. f.writeByte(0)
  321. if len(op.Values) > 0 {
  322. f[flagPos] |= flagQueryValues
  323. f.writeShort(uint16(len(op.Values)))
  324. for _, value := range op.Values {
  325. f.writeBytes(value)
  326. }
  327. }
  328. if op.PageSize > 0 {
  329. f[flagPos] |= flagPageSize
  330. f.writeInt(int32(op.PageSize))
  331. }
  332. if len(op.PageState) > 0 {
  333. f[flagPos] |= flagPageState
  334. f.writeBytes(op.PageState)
  335. }
  336. } else if version == 1 {
  337. if len(op.Prepared) > 0 {
  338. f.writeShort(uint16(len(op.Values)))
  339. for _, value := range op.Values {
  340. f.writeBytes(value)
  341. }
  342. }
  343. f.writeConsistency(op.Cons)
  344. }
  345. return f, nil
  346. }
  347. type prepareFrame struct {
  348. Stmt string
  349. }
  350. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  351. if f == nil {
  352. f = make(frame, headerSize, defaultFrameSize)
  353. }
  354. f.setHeader(version, 0, 0, opPrepare)
  355. f.writeLongString(op.Stmt)
  356. return f, nil
  357. }
  358. type optionsFrame struct{}
  359. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  360. if f == nil {
  361. f = make(frame, headerSize, defaultFrameSize)
  362. }
  363. f.setHeader(version, 0, 0, opOptions)
  364. return f, nil
  365. }