frame.go 8.0 KB


  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. )
  8. const (
  9. protoRequest byte = 0x02
  10. protoResponse byte = 0x82
  11. opError byte = 0x00
  12. opStartup byte = 0x01
  13. opReady byte = 0x02
  14. opAuthenticate byte = 0x03
  15. opOptions byte = 0x05
  16. opSupported byte = 0x06
  17. opQuery byte = 0x07
  18. opResult byte = 0x08
  19. opPrepare byte = 0x09
  20. opExecute byte = 0x0A
  21. opRegister byte = 0x0B
  22. opEvent byte = 0x0C
  23. opBatch byte = 0x0D
  24. opAuthChallenge byte = 0x0E
  25. opAuthResponse byte = 0x0F
  26. opAuthSuccess byte = 0x10
  27. resultKindVoid = 1
  28. resultKindRows = 2
  29. resultKindKeyspace = 3
  30. resultKindPrepared = 4
  31. resultKindSchemaChanged = 5
  32. flagQueryValues uint8 = 1
  33. flagCompress uint8 = 1
  34. flagPageSize uint8 = 4
  35. flagPageState uint8 = 8
  36. flagHasMore uint8 = 2
  37. headerSize = 8
  38. )
  39. type frame []byte
  40. func (f *frame) writeInt(v int32) {
  41. p := f.grow(4)
  42. (*f)[p] = byte(v >> 24)
  43. (*f)[p+1] = byte(v >> 16)
  44. (*f)[p+2] = byte(v >> 8)
  45. (*f)[p+3] = byte(v)
  46. }
  47. func (f *frame) writeShort(v uint16) {
  48. p := f.grow(2)
  49. (*f)[p] = byte(v >> 8)
  50. (*f)[p+1] = byte(v)
  51. }
  52. func (f *frame) writeString(v string) {
  53. f.writeShort(uint16(len(v)))
  54. p := f.grow(len(v))
  55. copy((*f)[p:], v)
  56. }
  57. func (f *frame) writeLongString(v string) {
  58. f.writeInt(int32(len(v)))
  59. p := f.grow(len(v))
  60. copy((*f)[p:], v)
  61. }
  62. func (f *frame) writeUUID() {
  63. }
  64. func (f *frame) writeStringList(v []string) {
  65. f.writeShort(uint16(len(v)))
  66. for i := range v {
  67. f.writeString(v[i])
  68. }
  69. }
  70. func (f *frame) writeByte(v byte) {
  71. p := f.grow(1)
  72. (*f)[p] = v
  73. }
  74. func (f *frame) writeBytes(v []byte) {
  75. if v == nil {
  76. f.writeInt(-1)
  77. return
  78. }
  79. f.writeInt(int32(len(v)))
  80. p := f.grow(len(v))
  81. copy((*f)[p:], v)
  82. }
  83. func (f *frame) writeShortBytes(v []byte) {
  84. f.writeShort(uint16(len(v)))
  85. p := f.grow(len(v))
  86. copy((*f)[p:], v)
  87. }
  88. func (f *frame) writeInet(ip net.IP, port int) {
  89. p := f.grow(1 + len(ip))
  90. (*f)[p] = byte(len(ip))
  91. copy((*f)[p+1:], ip)
  92. f.writeInt(int32(port))
  93. }
  94. func (f *frame) writeStringMap(v map[string]string) {
  95. f.writeShort(uint16(len(v)))
  96. for key, value := range v {
  97. f.writeString(key)
  98. f.writeString(value)
  99. }
  100. }
  101. func (f *frame) writeStringMultimap(v map[string][]string) {
  102. f.writeShort(uint16(len(v)))
  103. for key, values := range v {
  104. f.writeString(key)
  105. f.writeStringList(values)
  106. }
  107. }
  108. func (f *frame) setHeader(version, flags, stream, opcode uint8) {
  109. (*f)[0] = version
  110. (*f)[1] = flags
  111. (*f)[2] = stream
  112. (*f)[3] = opcode
  113. }
  114. func (f *frame) setLength(length int) {
  115. (*f)[4] = byte(length >> 24)
  116. (*f)[5] = byte(length >> 16)
  117. (*f)[6] = byte(length >> 8)
  118. (*f)[7] = byte(length)
  119. }
  120. func (f *frame) Length() int {
  121. return int((*f)[4])<<24 | int((*f)[5])<<16 | int((*f)[6])<<8 | int((*f)[7])
  122. }
  123. func (f *frame) grow(n int) int {
  124. if len(*f)+n >= cap(*f) {
  125. buf := make(frame, len(*f), len(*f)*2+n)
  126. copy(buf, *f)
  127. *f = buf
  128. }
  129. p := len(*f)
  130. *f = (*f)[:p+n]
  131. return p
  132. }
  133. func (f *frame) skipHeader() {
  134. *f = (*f)[headerSize:]
  135. }
  136. func (f *frame) readInt() int {
  137. if len(*f) < 4 {
  138. panic(ErrProtocol)
  139. }
  140. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  141. *f = (*f)[4:]
  142. return int(int32(v))
  143. }
  144. func (f *frame) readShort() uint16 {
  145. if len(*f) < 2 {
  146. panic(ErrProtocol)
  147. }
  148. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  149. *f = (*f)[2:]
  150. return v
  151. }
  152. func (f *frame) readString() string {
  153. n := int(f.readShort())
  154. if len(*f) < n {
  155. panic(ErrProtocol)
  156. }
  157. v := string((*f)[:n])
  158. *f = (*f)[n:]
  159. return v
  160. }
  161. func (f *frame) readLongString() string {
  162. n := f.readInt()
  163. if len(*f) < n {
  164. panic(ErrProtocol)
  165. }
  166. v := string((*f)[:n])
  167. *f = (*f)[n:]
  168. return v
  169. }
  170. func (f *frame) readBytes() []byte {
  171. n := f.readInt()
  172. if n < 0 {
  173. return nil
  174. }
  175. if len(*f) < n {
  176. panic(ErrProtocol)
  177. }
  178. v := (*f)[:n]
  179. *f = (*f)[n:]
  180. return v
  181. }
  182. func (f *frame) readShortBytes() []byte {
  183. n := int(f.readShort())
  184. if len(*f) < n {
  185. panic(ErrProtocol)
  186. }
  187. v := (*f)[:n]
  188. *f = (*f)[n:]
  189. return v
  190. }
  191. func (f *frame) readTypeInfo() *TypeInfo {
  192. x := f.readShort()
  193. typ := &TypeInfo{Type: Type(x)}
  194. switch typ.Type {
  195. case TypeCustom:
  196. typ.Custom = f.readString()
  197. case TypeMap:
  198. typ.Key = f.readTypeInfo()
  199. fallthrough
  200. case TypeList, TypeSet:
  201. typ.Elem = f.readTypeInfo()
  202. }
  203. return typ
  204. }
  205. func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
  206. flags := f.readInt()
  207. numColumns := f.readInt()
  208. var pageState []byte
  209. if flags&2 != 0 {
  210. pageState = f.readBytes()
  211. }
  212. globalKeyspace := ""
  213. globalTable := ""
  214. if flags&1 != 0 {
  215. globalKeyspace = f.readString()
  216. globalTable = f.readString()
  217. }
  218. columns := make([]ColumnInfo, numColumns)
  219. for i := 0; i < numColumns; i++ {
  220. columns[i].Keyspace = globalKeyspace
  221. columns[i].Table = globalTable
  222. if flags&1 == 0 {
  223. columns[i].Keyspace = f.readString()
  224. columns[i].Table = f.readString()
  225. }
  226. columns[i].Name = f.readString()
  227. columns[i].TypeInfo = f.readTypeInfo()
  228. }
  229. return columns, pageState
  230. }
  231. func (f *frame) writeConsistency(c Consistency) {
  232. f.writeShort(consistencyCodes[c])
  233. }
  234. var consistencyCodes = []uint16{
  235. Any: 0x0000,
  236. One: 0x0001,
  237. Two: 0x0002,
  238. Three: 0x0003,
  239. Quorum: 0x0004,
  240. All: 0x0005,
  241. LocalQuorum: 0x0006,
  242. EachQuorum: 0x0007,
  243. Serial: 0x0008,
  244. LocalSerial: 0x0009,
  245. }
  246. type readyFrame struct{}
  247. type resultVoidFrame struct{}
  248. type resultRowsFrame struct {
  249. Columns []ColumnInfo
  250. Rows [][][]byte
  251. PagingState []byte
  252. }
  253. type resultKeyspaceFrame struct {
  254. Keyspace string
  255. }
  256. type resultPreparedFrame struct {
  257. PreparedId []byte
  258. Values []ColumnInfo
  259. }
  260. type errorFrame struct {
  261. Code int
  262. Message string
  263. }
  264. func (e errorFrame) Error() string {
  265. return e.Message
  266. }
  267. type operation interface {
  268. encodeFrame(version uint8, dst frame) (frame, error)
  269. }
  270. type startupFrame struct {
  271. CQLVersion string
  272. Compression string
  273. }
  274. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  275. if f == nil {
  276. f = make(frame, headerSize, defaultFrameSize)
  277. }
  278. f.setHeader(version, 0, 0, opStartup)
  279. f.writeShort(1)
  280. f.writeString("CQL_VERSION")
  281. f.writeString(op.CQLVersion)
  282. if op.Compression != "" {
  283. f[headerSize+1] += 1
  284. f.writeString("COMPRESSION")
  285. f.writeString(op.Compression)
  286. }
  287. return f, nil
  288. }
  289. type queryFrame struct {
  290. Stmt string
  291. Prepared []byte
  292. Cons Consistency
  293. Values [][]byte
  294. PageSize int
  295. PageState []byte
  296. }
  297. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  298. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  299. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  300. return nil, ErrUnsupported
  301. }
  302. if f == nil {
  303. f = make(frame, headerSize, defaultFrameSize)
  304. }
  305. if len(op.Prepared) > 0 {
  306. f.setHeader(version, 0, 0, opExecute)
  307. f.writeShortBytes(op.Prepared)
  308. } else {
  309. f.setHeader(version, 0, 0, opQuery)
  310. f.writeLongString(op.Stmt)
  311. }
  312. if version >= 2 {
  313. f.writeConsistency(op.Cons)
  314. flagPos := len(f)
  315. f.writeByte(0)
  316. if len(op.Values) > 0 {
  317. f[flagPos] |= flagQueryValues
  318. f.writeShort(uint16(len(op.Values)))
  319. for _, value := range op.Values {
  320. f.writeBytes(value)
  321. }
  322. }
  323. if op.PageSize > 0 {
  324. f[flagPos] |= flagPageSize
  325. f.writeInt(int32(op.PageSize))
  326. }
  327. if len(op.PageState) > 0 {
  328. f[flagPos] |= flagPageState
  329. f.writeBytes(op.PageState)
  330. }
  331. } else if version == 1 {
  332. if len(op.Prepared) > 0 {
  333. f.writeShort(uint16(len(op.Values)))
  334. for _, value := range op.Values {
  335. f.writeBytes(value)
  336. }
  337. }
  338. f.writeConsistency(op.Cons)
  339. }
  340. return f, nil
  341. }
  342. type prepareFrame struct {
  343. Stmt string
  344. }
  345. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  346. if f == nil {
  347. f = make(frame, headerSize, defaultFrameSize)
  348. }
  349. f.setHeader(version, 0, 0, opPrepare)
  350. f.writeLongString(op.Stmt)
  351. return f, nil
  352. }
  353. type optionsFrame struct{}
  354. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  355. if f == nil {
  356. f = make(frame, headerSize, defaultFrameSize)
  357. }
  358. f.setHeader(version, 0, 0, opOptions)
  359. return f, nil
  360. }