session.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. "unicode"
  18. "github.com/gocql/gocql/internal/lru"
  19. )
  20. // Session is the interface used by users to interact with the database.
  21. //
  22. // It's safe for concurrent use by multiple goroutines and a typical usage
  23. // scenario is to have one global session object to interact with the
  24. // whole Cassandra cluster.
  25. //
  26. // This type extends the Node interface by adding a convinient query builder
  27. // and automatically sets a default consistency level on all operations
  28. // that do not have a consistency level set.
  29. type Session struct {
  30. cons Consistency
  31. pageSize int
  32. prefetch float64
  33. routingKeyInfoCache routingKeyInfoLRU
  34. schemaDescriber *schemaDescriber
  35. trace Tracer
  36. queryObserver QueryObserver
  37. batchObserver BatchObserver
  38. connectObserver ConnectObserver
  39. frameObserver FrameHeaderObserver
  40. hostSource *ringDescriber
  41. stmtsLRU *preparedLRU
  42. connCfg *ConnConfig
  43. executor *queryExecutor
  44. pool *policyConnPool
  45. policy HostSelectionPolicy
  46. ring ring
  47. metadata clusterMetadata
  48. mu sync.RWMutex
  49. control *controlConn
  50. // event handlers
  51. nodeEvents *eventDebouncer
  52. schemaEvents *eventDebouncer
  53. // ring metadata
  54. hosts []HostInfo
  55. useSystemSchema bool
  56. cfg ClusterConfig
  57. quit chan struct{}
  58. closeMu sync.RWMutex
  59. isClosed bool
  60. }
  61. var queryPool = &sync.Pool{
  62. New: func() interface{} {
  63. return new(Query)
  64. },
  65. }
  66. func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) {
  67. var hosts []*HostInfo
  68. for _, hostport := range addrs {
  69. resolvedHosts, err := hostInfo(hostport, defaultPort)
  70. if err != nil {
  71. // Try other hosts if unable to resolve DNS name
  72. if _, ok := err.(*net.DNSError); ok {
  73. Logger.Printf("gocql: dns error: %v\n", err)
  74. continue
  75. }
  76. return nil, err
  77. }
  78. hosts = append(hosts, resolvedHosts...)
  79. }
  80. if len(hosts) == 0 {
  81. return nil, errors.New("failed to resolve any of the provided hostnames")
  82. }
  83. return hosts, nil
  84. }
  85. // NewSession wraps an existing Node.
  86. func NewSession(cfg ClusterConfig) (*Session, error) {
  87. // Check that hosts in the ClusterConfig is not empty
  88. if len(cfg.Hosts) < 1 {
  89. return nil, ErrNoHosts
  90. }
  91. s := &Session{
  92. cons: cfg.Consistency,
  93. prefetch: 0.25,
  94. cfg: cfg,
  95. pageSize: cfg.PageSize,
  96. stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
  97. quit: make(chan struct{}),
  98. connectObserver: cfg.ConnectObserver,
  99. }
  100. s.schemaDescriber = newSchemaDescriber(s)
  101. s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent)
  102. s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent)
  103. s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
  104. s.hostSource = &ringDescriber{session: s}
  105. if cfg.PoolConfig.HostSelectionPolicy == nil {
  106. cfg.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
  107. }
  108. s.pool = cfg.PoolConfig.buildPool(s)
  109. s.policy = cfg.PoolConfig.HostSelectionPolicy
  110. s.policy.Init(s)
  111. s.executor = &queryExecutor{
  112. pool: s.pool,
  113. policy: cfg.PoolConfig.HostSelectionPolicy,
  114. }
  115. s.queryObserver = cfg.QueryObserver
  116. s.batchObserver = cfg.BatchObserver
  117. s.connectObserver = cfg.ConnectObserver
  118. s.frameObserver = cfg.FrameHeaderObserver
  119. //Check the TLS Config before trying to connect to anything external
  120. connCfg, err := connConfig(&s.cfg)
  121. if err != nil {
  122. //TODO: Return a typed error
  123. return nil, fmt.Errorf("gocql: unable to create session: %v", err)
  124. }
  125. s.connCfg = connCfg
  126. if err := s.init(); err != nil {
  127. s.Close()
  128. if err == ErrNoConnectionsStarted {
  129. //This error used to be generated inside NewSession & returned directly
  130. //Forward it on up to be backwards compatible
  131. return nil, ErrNoConnectionsStarted
  132. } else {
  133. // TODO(zariel): dont wrap this error in fmt.Errorf, return a typed error
  134. return nil, fmt.Errorf("gocql: unable to create session: %v", err)
  135. }
  136. }
  137. return s, nil
  138. }
  139. func (s *Session) init() error {
  140. hosts, err := addrsToHosts(s.cfg.Hosts, s.cfg.Port)
  141. if err != nil {
  142. return err
  143. }
  144. s.ring.endpoints = hosts
  145. if !s.cfg.disableControlConn {
  146. s.control = createControlConn(s)
  147. if s.cfg.ProtoVersion == 0 {
  148. proto, err := s.control.discoverProtocol(hosts)
  149. if err != nil {
  150. return fmt.Errorf("unable to discover protocol version: %v", err)
  151. } else if proto == 0 {
  152. return errors.New("unable to discovery protocol version")
  153. }
  154. // TODO(zariel): we really only need this in 1 place
  155. s.cfg.ProtoVersion = proto
  156. s.connCfg.ProtoVersion = proto
  157. }
  158. if err := s.control.connect(hosts); err != nil {
  159. return err
  160. }
  161. if !s.cfg.DisableInitialHostLookup {
  162. var partitioner string
  163. newHosts, partitioner, err := s.hostSource.GetHosts()
  164. if err != nil {
  165. return err
  166. }
  167. s.policy.SetPartitioner(partitioner)
  168. filteredHosts := make([]*HostInfo, 0, len(newHosts))
  169. for _, host := range newHosts {
  170. if !s.cfg.filterHost(host) {
  171. filteredHosts = append(filteredHosts, host)
  172. }
  173. }
  174. hosts = append(hosts, filteredHosts...)
  175. }
  176. }
  177. hostMap := make(map[string]*HostInfo, len(hosts))
  178. for _, host := range hosts {
  179. hostMap[host.ConnectAddress().String()] = host
  180. }
  181. for _, host := range hostMap {
  182. host = s.ring.addOrUpdate(host)
  183. s.addNewNode(host)
  184. }
  185. // TODO(zariel): we probably dont need this any more as we verify that we
  186. // can connect to one of the endpoints supplied by using the control conn.
  187. // See if there are any connections in the pool
  188. if s.cfg.ReconnectInterval > 0 {
  189. go s.reconnectDownedHosts(s.cfg.ReconnectInterval)
  190. }
  191. // If we disable the initial host lookup, we need to still check if the
  192. // cluster is using the newer system schema or not... however, if control
  193. // connection is disable, we really have no choice, so we just make our
  194. // best guess...
  195. if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup {
  196. newer, _ := checkSystemSchema(s.control)
  197. s.useSystemSchema = newer
  198. } else {
  199. host := s.ring.rrHost()
  200. s.useSystemSchema = host.Version().Major >= 3
  201. }
  202. if s.pool.Size() == 0 {
  203. return ErrNoConnectionsStarted
  204. }
  205. return nil
  206. }
  207. func (s *Session) reconnectDownedHosts(intv time.Duration) {
  208. reconnectTicker := time.NewTicker(intv)
  209. defer reconnectTicker.Stop()
  210. for {
  211. select {
  212. case <-reconnectTicker.C:
  213. hosts := s.ring.allHosts()
  214. // Print session.ring for debug.
  215. if gocqlDebug {
  216. buf := bytes.NewBufferString("Session.ring:")
  217. for _, h := range hosts {
  218. buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]")
  219. }
  220. Logger.Println(buf.String())
  221. }
  222. for _, h := range hosts {
  223. if h.IsUp() {
  224. continue
  225. }
  226. s.handleNodeUp(h.ConnectAddress(), h.Port(), true)
  227. }
  228. case <-s.quit:
  229. return
  230. }
  231. }
  232. }
  233. // SetConsistency sets the default consistency level for this session. This
  234. // setting can also be changed on a per-query basis and the default value
  235. // is Quorum.
  236. func (s *Session) SetConsistency(cons Consistency) {
  237. s.mu.Lock()
  238. s.cons = cons
  239. s.mu.Unlock()
  240. }
  241. // SetPageSize sets the default page size for this session. A value <= 0 will
  242. // disable paging. This setting can also be changed on a per-query basis.
  243. func (s *Session) SetPageSize(n int) {
  244. s.mu.Lock()
  245. s.pageSize = n
  246. s.mu.Unlock()
  247. }
  248. // SetPrefetch sets the default threshold for pre-fetching new pages. If
  249. // there are only p*pageSize rows remaining, the next page will be requested
  250. // automatically. This value can also be changed on a per-query basis and
  251. // the default value is 0.25.
  252. func (s *Session) SetPrefetch(p float64) {
  253. s.mu.Lock()
  254. s.prefetch = p
  255. s.mu.Unlock()
  256. }
  257. // SetTrace sets the default tracer for this session. This setting can also
  258. // be changed on a per-query basis.
  259. func (s *Session) SetTrace(trace Tracer) {
  260. s.mu.Lock()
  261. s.trace = trace
  262. s.mu.Unlock()
  263. }
  264. // Query generates a new query object for interacting with the database.
  265. // Further details of the query may be tweaked using the resulting query
  266. // value before the query is executed. Query is automatically prepared
  267. // if it has not previously been executed.
  268. func (s *Session) Query(stmt string, values ...interface{}) *Query {
  269. qry := queryPool.Get().(*Query)
  270. qry.session = s
  271. qry.stmt = stmt
  272. qry.values = values
  273. qry.defaultsFromSession()
  274. return qry
  275. }
  276. type QueryInfo struct {
  277. Id []byte
  278. Args []ColumnInfo
  279. Rval []ColumnInfo
  280. PKeyColumns []int
  281. }
  282. // Bind generates a new query object based on the query statement passed in.
  283. // The query is automatically prepared if it has not previously been executed.
  284. // The binding callback allows the application to define which query argument
  285. // values will be marshalled as part of the query execution.
  286. // During execution, the meta data of the prepared query will be routed to the
  287. // binding callback, which is responsible for producing the query argument values.
  288. func (s *Session) Bind(stmt string, b func(q *QueryInfo) ([]interface{}, error)) *Query {
  289. qry := queryPool.Get().(*Query)
  290. qry.session = s
  291. qry.stmt = stmt
  292. qry.binding = b
  293. qry.defaultsFromSession()
  294. return qry
  295. }
  296. // Close closes all connections. The session is unusable after this
  297. // operation.
  298. func (s *Session) Close() {
  299. s.closeMu.Lock()
  300. defer s.closeMu.Unlock()
  301. if s.isClosed {
  302. return
  303. }
  304. s.isClosed = true
  305. if s.pool != nil {
  306. s.pool.Close()
  307. }
  308. if s.control != nil {
  309. s.control.close()
  310. }
  311. if s.nodeEvents != nil {
  312. s.nodeEvents.stop()
  313. }
  314. if s.schemaEvents != nil {
  315. s.schemaEvents.stop()
  316. }
  317. if s.quit != nil {
  318. close(s.quit)
  319. }
  320. }
  321. func (s *Session) Closed() bool {
  322. s.closeMu.RLock()
  323. closed := s.isClosed
  324. s.closeMu.RUnlock()
  325. return closed
  326. }
  327. func (s *Session) executeQuery(qry *Query) (it *Iter) {
  328. // fail fast
  329. if s.Closed() {
  330. return &Iter{err: ErrSessionClosed}
  331. }
  332. iter, err := s.executor.executeQuery(qry)
  333. if err != nil {
  334. return &Iter{err: err}
  335. }
  336. if iter == nil {
  337. panic("nil iter")
  338. }
  339. return iter
  340. }
  341. func (s *Session) removeHost(h *HostInfo) {
  342. s.policy.RemoveHost(h)
  343. s.pool.removeHost(h.ConnectAddress())
  344. s.ring.removeHost(h.ConnectAddress())
  345. }
  346. // KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist.
  347. func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
  348. // fail fast
  349. if s.Closed() {
  350. return nil, ErrSessionClosed
  351. } else if keyspace == "" {
  352. return nil, ErrNoKeyspace
  353. }
  354. return s.schemaDescriber.getSchema(keyspace)
  355. }
  356. func (s *Session) getConn() *Conn {
  357. hosts := s.ring.allHosts()
  358. for _, host := range hosts {
  359. if !host.IsUp() {
  360. continue
  361. }
  362. pool, ok := s.pool.getPool(host)
  363. if !ok {
  364. continue
  365. } else if conn := pool.Pick(); conn != nil {
  366. return conn
  367. }
  368. }
  369. return nil
  370. }
  371. // returns routing key indexes and type info
  372. func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyInfo, error) {
  373. s.routingKeyInfoCache.mu.Lock()
  374. entry, cached := s.routingKeyInfoCache.lru.Get(stmt)
  375. if cached {
  376. // done accessing the cache
  377. s.routingKeyInfoCache.mu.Unlock()
  378. // the entry is an inflight struct similar to that used by
  379. // Conn to prepare statements
  380. inflight := entry.(*inflightCachedEntry)
  381. // wait for any inflight work
  382. inflight.wg.Wait()
  383. if inflight.err != nil {
  384. return nil, inflight.err
  385. }
  386. key, _ := inflight.value.(*routingKeyInfo)
  387. return key, nil
  388. }
  389. // create a new inflight entry while the data is created
  390. inflight := new(inflightCachedEntry)
  391. inflight.wg.Add(1)
  392. defer inflight.wg.Done()
  393. s.routingKeyInfoCache.lru.Add(stmt, inflight)
  394. s.routingKeyInfoCache.mu.Unlock()
  395. var (
  396. info *preparedStatment
  397. partitionKey []*ColumnMetadata
  398. )
  399. conn := s.getConn()
  400. if conn == nil {
  401. // TODO: better error?
  402. inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available")
  403. return nil, inflight.err
  404. }
  405. // get the query info for the statement
  406. info, inflight.err = conn.prepareStatement(ctx, stmt, nil)
  407. if inflight.err != nil {
  408. // don't cache this error
  409. s.routingKeyInfoCache.Remove(stmt)
  410. return nil, inflight.err
  411. }
  412. // TODO: it would be nice to mark hosts here but as we are not using the policies
  413. // to fetch hosts we cant
  414. if info.request.colCount == 0 {
  415. // no arguments, no routing key, and no error
  416. return nil, nil
  417. }
  418. if len(info.request.pkeyColumns) > 0 {
  419. // proto v4 dont need to calculate primary key columns
  420. types := make([]TypeInfo, len(info.request.pkeyColumns))
  421. for i, col := range info.request.pkeyColumns {
  422. types[i] = info.request.columns[col].TypeInfo
  423. }
  424. routingKeyInfo := &routingKeyInfo{
  425. indexes: info.request.pkeyColumns,
  426. types: types,
  427. }
  428. inflight.value = routingKeyInfo
  429. return routingKeyInfo, nil
  430. }
  431. // get the table metadata
  432. table := info.request.columns[0].Table
  433. var keyspaceMetadata *KeyspaceMetadata
  434. keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace)
  435. if inflight.err != nil {
  436. // don't cache this error
  437. s.routingKeyInfoCache.Remove(stmt)
  438. return nil, inflight.err
  439. }
  440. tableMetadata, found := keyspaceMetadata.Tables[table]
  441. if !found {
  442. // unlikely that the statement could be prepared and the metadata for
  443. // the table couldn't be found, but this may indicate either a bug
  444. // in the metadata code, or that the table was just dropped.
  445. inflight.err = ErrNoMetadata
  446. // don't cache this error
  447. s.routingKeyInfoCache.Remove(stmt)
  448. return nil, inflight.err
  449. }
  450. partitionKey = tableMetadata.PartitionKey
  451. size := len(partitionKey)
  452. routingKeyInfo := &routingKeyInfo{
  453. indexes: make([]int, size),
  454. types: make([]TypeInfo, size),
  455. }
  456. for keyIndex, keyColumn := range partitionKey {
  457. // set an indicator for checking if the mapping is missing
  458. routingKeyInfo.indexes[keyIndex] = -1
  459. // find the column in the query info
  460. for argIndex, boundColumn := range info.request.columns {
  461. if keyColumn.Name == boundColumn.Name {
  462. // there may be many such bound columns, pick the first
  463. routingKeyInfo.indexes[keyIndex] = argIndex
  464. routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo
  465. break
  466. }
  467. }
  468. if routingKeyInfo.indexes[keyIndex] == -1 {
  469. // missing a routing key column mapping
  470. // no routing key, and no error
  471. return nil, nil
  472. }
  473. }
  474. // cache this result
  475. inflight.value = routingKeyInfo
  476. return routingKeyInfo, nil
  477. }
  478. func (b *Batch) execute(conn *Conn) *Iter {
  479. return conn.executeBatch(b)
  480. }
  481. func (s *Session) executeBatch(batch *Batch) *Iter {
  482. // fail fast
  483. if s.Closed() {
  484. return &Iter{err: ErrSessionClosed}
  485. }
  486. // Prevent the execution of the batch if greater than the limit
  487. // Currently batches have a limit of 65536 queries.
  488. // https://datastax-oss.atlassian.net/browse/JAVA-229
  489. if batch.Size() > BatchSizeMaximum {
  490. return &Iter{err: ErrTooManyStmts}
  491. }
  492. iter, err := s.executor.executeQuery(batch)
  493. if err != nil {
  494. return &Iter{err: err}
  495. }
  496. return iter
  497. }
  498. // ExecuteBatch executes a batch operation and returns nil if successful
  499. // otherwise an error is returned describing the failure.
  500. func (s *Session) ExecuteBatch(batch *Batch) error {
  501. iter := s.executeBatch(batch)
  502. return iter.Close()
  503. }
  504. // ExecuteBatchCAS executes a batch operation and returns true if successful and
  505. // an iterator (to scan aditional rows if more than one conditional statement)
  506. // was sent.
  507. // Further scans on the interator must also remember to include
  508. // the applied boolean as the first argument to *Iter.Scan
  509. func (s *Session) ExecuteBatchCAS(batch *Batch, dest ...interface{}) (applied bool, iter *Iter, err error) {
  510. iter = s.executeBatch(batch)
  511. if err := iter.checkErrAndNotFound(); err != nil {
  512. iter.Close()
  513. return false, nil, err
  514. }
  515. if len(iter.Columns()) > 1 {
  516. dest = append([]interface{}{&applied}, dest...)
  517. iter.Scan(dest...)
  518. } else {
  519. iter.Scan(&applied)
  520. }
  521. return applied, iter, nil
  522. }
  523. // MapExecuteBatchCAS executes a batch operation much like ExecuteBatchCAS,
  524. // however it accepts a map rather than a list of arguments for the initial
  525. // scan.
  526. func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) (applied bool, iter *Iter, err error) {
  527. iter = s.executeBatch(batch)
  528. if err := iter.checkErrAndNotFound(); err != nil {
  529. iter.Close()
  530. return false, nil, err
  531. }
  532. iter.MapScan(dest)
  533. applied = dest["[applied]"].(bool)
  534. delete(dest, "[applied]")
  535. // we usually close here, but instead of closing, just returin an error
  536. // if MapScan failed. Although Close just returns err, using Close
  537. // here might be confusing as we are not actually closing the iter
  538. return applied, iter, iter.err
  539. }
  540. func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) {
  541. if s.connectObserver != nil {
  542. obs := ObservedConnect{
  543. Host: host,
  544. Start: time.Now(),
  545. }
  546. conn, err := s.dial(host, s.connCfg, errorHandler)
  547. obs.End = time.Now()
  548. obs.Err = err
  549. s.connectObserver.ObserveConnect(obs)
  550. return conn, err
  551. }
  552. return s.dial(host, s.connCfg, errorHandler)
  553. }
  554. type queryMetrics struct {
  555. Attempts int
  556. TotalLatency int64
  557. }
  558. // Query represents a CQL statement that can be executed.
  559. type Query struct {
  560. stmt string
  561. values []interface{}
  562. cons Consistency
  563. pageSize int
  564. routingKey []byte
  565. routingKeyBuffer []byte
  566. pageState []byte
  567. prefetch float64
  568. trace Tracer
  569. observer QueryObserver
  570. session *Session
  571. rt RetryPolicy
  572. binding func(q *QueryInfo) ([]interface{}, error)
  573. serialCons SerialConsistency
  574. defaultTimestamp bool
  575. defaultTimestampValue int64
  576. disableSkipMetadata bool
  577. context context.Context
  578. cancelQuery func()
  579. idempotent bool
  580. metrics map[string]*queryMetrics
  581. disableAutoPage bool
  582. }
  583. func (q *Query) defaultsFromSession() {
  584. s := q.session
  585. s.mu.RLock()
  586. q.cons = s.cons
  587. q.pageSize = s.pageSize
  588. q.trace = s.trace
  589. q.observer = s.queryObserver
  590. q.prefetch = s.prefetch
  591. q.rt = s.cfg.RetryPolicy
  592. q.serialCons = s.cfg.SerialConsistency
  593. q.defaultTimestamp = s.cfg.DefaultTimestamp
  594. q.idempotent = s.cfg.DefaultIdempotence
  595. q.metrics = make(map[string]*queryMetrics)
  596. // Initiate an empty context with a cancel call
  597. q.WithContext(context.Background())
  598. s.mu.RUnlock()
  599. }
  600. func (q *Query) getHostMetrics(host *HostInfo) *queryMetrics {
  601. hostMetrics, exists := q.metrics[host.ConnectAddress().String()]
  602. if !exists {
  603. // if the host is not in the map, it means it's been accessed for the first time
  604. hostMetrics = &queryMetrics{Attempts: 0, TotalLatency: 0}
  605. q.metrics[host.ConnectAddress().String()] = hostMetrics
  606. }
  607. return hostMetrics
  608. }
  609. // Statement returns the statement that was used to generate this query.
  610. func (q Query) Statement() string {
  611. return q.stmt
  612. }
  613. // String implements the stringer interface.
  614. func (q Query) String() string {
  615. return fmt.Sprintf("[query statement=%q values=%+v consistency=%s]", q.stmt, q.values, q.cons)
  616. }
  617. //Attempts returns the number of times the query was executed.
  618. func (q *Query) Attempts() int {
  619. attempts := 0
  620. for _, metric := range q.metrics {
  621. attempts += metric.Attempts
  622. }
  623. return attempts
  624. }
  625. //Latency returns the average amount of nanoseconds per attempt of the query.
  626. func (q *Query) Latency() int64 {
  627. var attempts int
  628. var latency int64
  629. for _, metric := range q.metrics {
  630. attempts += metric.Attempts
  631. latency += metric.TotalLatency
  632. }
  633. if attempts > 0 {
  634. return latency / int64(attempts)
  635. }
  636. return 0
  637. }
  638. // Consistency sets the consistency level for this query. If no consistency
  639. // level have been set, the default consistency level of the cluster
  640. // is used.
  641. func (q *Query) Consistency(c Consistency) *Query {
  642. q.cons = c
  643. return q
  644. }
  645. // GetConsistency returns the currently configured consistency level for
  646. // the query.
  647. func (q *Query) GetConsistency() Consistency {
  648. return q.cons
  649. }
  650. // Same as Consistency but without a return value
  651. func (q *Query) SetConsistency(c Consistency) {
  652. q.cons = c
  653. }
  654. // Trace enables tracing of this query. Look at the documentation of the
  655. // Tracer interface to learn more about tracing.
  656. func (q *Query) Trace(trace Tracer) *Query {
  657. q.trace = trace
  658. return q
  659. }
  660. // Observer enables query-level observer on this query.
  661. // The provided observer will be called every time this query is executed.
  662. func (q *Query) Observer(observer QueryObserver) *Query {
  663. q.observer = observer
  664. return q
  665. }
  666. // PageSize will tell the iterator to fetch the result in pages of size n.
  667. // This is useful for iterating over large result sets, but setting the
  668. // page size too low might decrease the performance. This feature is only
  669. // available in Cassandra 2 and onwards.
  670. func (q *Query) PageSize(n int) *Query {
  671. q.pageSize = n
  672. return q
  673. }
  674. // DefaultTimestamp will enable the with default timestamp flag on the query.
  675. // If enable, this will replace the server side assigned
  676. // timestamp as default timestamp. Note that a timestamp in the query itself
  677. // will still override this timestamp. This is entirely optional.
  678. //
  679. // Only available on protocol >= 3
  680. func (q *Query) DefaultTimestamp(enable bool) *Query {
  681. q.defaultTimestamp = enable
  682. return q
  683. }
  684. // WithTimestamp will enable the with default timestamp flag on the query
  685. // like DefaultTimestamp does. But also allows to define value for timestamp.
  686. // It works the same way as USING TIMESTAMP in the query itself, but
  687. // should not break prepared query optimization
  688. //
  689. // Only available on protocol >= 3
  690. func (q *Query) WithTimestamp(timestamp int64) *Query {
  691. q.DefaultTimestamp(true)
  692. q.defaultTimestampValue = timestamp
  693. return q
  694. }
  695. // RoutingKey sets the routing key to use when a token aware connection
  696. // pool is used to optimize the routing of this query.
  697. func (q *Query) RoutingKey(routingKey []byte) *Query {
  698. q.routingKey = routingKey
  699. return q
  700. }
  701. // WithContext will set the context to use during a query, it will be used to
  702. // timeout when waiting for responses from Cassandra. Additionally it adds
  703. // the cancel function so that it can be called whenever necessary.
  704. func (q *Query) WithContext(ctx context.Context) *Query {
  705. q.context, q.cancelQuery = context.WithCancel(ctx)
  706. return q
  707. }
  708. func (q *Query) Cancel() {
  709. q.cancelQuery()
  710. }
  711. func (q *Query) execute(conn *Conn) *Iter {
  712. return conn.executeQuery(q)
  713. }
  714. func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
  715. hostMetrics := q.getHostMetrics(host)
  716. hostMetrics.Attempts++
  717. hostMetrics.TotalLatency += end.Sub(start).Nanoseconds()
  718. if q.observer != nil {
  719. q.observer.ObserveQuery(q.context, ObservedQuery{
  720. Keyspace: keyspace,
  721. Statement: q.stmt,
  722. Start: start,
  723. End: end,
  724. Rows: iter.numRows,
  725. Host: host,
  726. Metrics: hostMetrics,
  727. Err: iter.err,
  728. })
  729. }
  730. }
  731. func (q *Query) retryPolicy() RetryPolicy {
  732. return q.rt
  733. }
  734. // Keyspace returns the keyspace the query will be executed against.
  735. func (q *Query) Keyspace() string {
  736. if q.session == nil {
  737. return ""
  738. }
  739. // TODO(chbannis): this should be parsed from the query or we should let
  740. // this be set by users.
  741. return q.session.cfg.Keyspace
  742. }
  743. // GetRoutingKey gets the routing key to use for routing this query. If
  744. // a routing key has not been explicitly set, then the routing key will
  745. // be constructed if possible using the keyspace's schema and the query
  746. // info for this query statement. If the routing key cannot be determined
  747. // then nil will be returned with no error. On any error condition,
  748. // an error description will be returned.
  749. func (q *Query) GetRoutingKey() ([]byte, error) {
  750. if q.routingKey != nil {
  751. return q.routingKey, nil
  752. } else if q.binding != nil && len(q.values) == 0 {
  753. // If this query was created using session.Bind we wont have the query
  754. // values yet, so we have to pass down to the next policy.
  755. // TODO: Remove this and handle this case
  756. return nil, nil
  757. }
  758. // try to determine the routing key
  759. routingKeyInfo, err := q.session.routingKeyInfo(q.context, q.stmt)
  760. if err != nil {
  761. return nil, err
  762. }
  763. if routingKeyInfo == nil {
  764. return nil, nil
  765. }
  766. if len(routingKeyInfo.indexes) == 1 {
  767. // single column routing key
  768. routingKey, err := Marshal(
  769. routingKeyInfo.types[0],
  770. q.values[routingKeyInfo.indexes[0]],
  771. )
  772. if err != nil {
  773. return nil, err
  774. }
  775. return routingKey, nil
  776. }
  777. // We allocate that buffer only once, so that further re-bind/exec of the
  778. // same query don't allocate more memory.
  779. if q.routingKeyBuffer == nil {
  780. q.routingKeyBuffer = make([]byte, 0, 256)
  781. }
  782. // composite routing key
  783. buf := bytes.NewBuffer(q.routingKeyBuffer)
  784. for i := range routingKeyInfo.indexes {
  785. encoded, err := Marshal(
  786. routingKeyInfo.types[i],
  787. q.values[routingKeyInfo.indexes[i]],
  788. )
  789. if err != nil {
  790. return nil, err
  791. }
  792. lenBuf := []byte{0x00, 0x00}
  793. binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded)))
  794. buf.Write(lenBuf)
  795. buf.Write(encoded)
  796. buf.WriteByte(0x00)
  797. }
  798. routingKey := buf.Bytes()
  799. return routingKey, nil
  800. }
  801. func (q *Query) shouldPrepare() bool {
  802. stmt := strings.TrimLeftFunc(strings.TrimRightFunc(q.stmt, func(r rune) bool {
  803. return unicode.IsSpace(r) || r == ';'
  804. }), unicode.IsSpace)
  805. var stmtType string
  806. if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 {
  807. stmtType = strings.ToLower(stmt[:n])
  808. }
  809. if stmtType == "begin" {
  810. if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 {
  811. stmtType = strings.ToLower(stmt[n+1:])
  812. }
  813. }
  814. switch stmtType {
  815. case "select", "insert", "update", "delete", "batch":
  816. return true
  817. }
  818. return false
  819. }
  820. // SetPrefetch sets the default threshold for pre-fetching new pages. If
  821. // there are only p*pageSize rows remaining, the next page will be requested
  822. // automatically.
  823. func (q *Query) Prefetch(p float64) *Query {
  824. q.prefetch = p
  825. return q
  826. }
  827. // RetryPolicy sets the policy to use when retrying the query.
  828. func (q *Query) RetryPolicy(r RetryPolicy) *Query {
  829. q.rt = r
  830. return q
  831. }
  832. func (q *Query) IsIdempotent() bool {
  833. return q.idempotent
  834. }
  835. // Idempotent marks the query as being idempotent or not depending on
  836. // the value.
  837. func (q *Query) Idempotent(value bool) *Query {
  838. q.idempotent = value
  839. return q
  840. }
  841. // Bind sets query arguments of query. This can also be used to rebind new query arguments
  842. // to an existing query instance.
  843. func (q *Query) Bind(v ...interface{}) *Query {
  844. q.values = v
  845. return q
  846. }
  847. // SerialConsistency sets the consistency level for the
  848. // serial phase of conditional updates. That consistency can only be
  849. // either SERIAL or LOCAL_SERIAL and if not present, it defaults to
  850. // SERIAL. This option will be ignored for anything else that a
  851. // conditional update/insert.
  852. func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
  853. q.serialCons = cons
  854. return q
  855. }
  856. // PageState sets the paging state for the query to resume paging from a specific
  857. // point in time. Setting this will disable to query paging for this query, and
  858. // must be used for all subsequent pages.
  859. func (q *Query) PageState(state []byte) *Query {
  860. q.pageState = state
  861. q.disableAutoPage = true
  862. return q
  863. }
  864. // NoSkipMetadata will override the internal result metadata cache so that the driver does not
  865. // send skip_metadata for queries, this means that the result will always contain
  866. // the metadata to parse the rows and will not reuse the metadata from the prepared
  867. // staement. This should only be used to work around cassandra bugs, such as when using
  868. // CAS operations which do not end in Cas.
  869. //
  870. // See https://issues.apache.org/jira/browse/CASSANDRA-11099
  871. // https://github.com/gocql/gocql/issues/612
  872. func (q *Query) NoSkipMetadata() *Query {
  873. q.disableSkipMetadata = true
  874. return q
  875. }
  876. // Exec executes the query without returning any rows.
  877. func (q *Query) Exec() error {
  878. return q.Iter().Close()
  879. }
  880. func isUseStatement(stmt string) bool {
  881. if len(stmt) < 3 {
  882. return false
  883. }
  884. return strings.EqualFold(stmt[0:3], "use")
  885. }
  886. // Iter executes the query and returns an iterator capable of iterating
  887. // over all results.
  888. func (q *Query) Iter() *Iter {
  889. if isUseStatement(q.stmt) {
  890. return &Iter{err: ErrUseStmt}
  891. }
  892. return q.session.executeQuery(q)
  893. }
  894. // MapScan executes the query, copies the columns of the first selected
  895. // row into the map pointed at by m and discards the rest. If no rows
  896. // were selected, ErrNotFound is returned.
  897. func (q *Query) MapScan(m map[string]interface{}) error {
  898. iter := q.Iter()
  899. if err := iter.checkErrAndNotFound(); err != nil {
  900. return err
  901. }
  902. iter.MapScan(m)
  903. return iter.Close()
  904. }
  905. // Scan executes the query, copies the columns of the first selected
  906. // row into the values pointed at by dest and discards the rest. If no rows
  907. // were selected, ErrNotFound is returned.
  908. func (q *Query) Scan(dest ...interface{}) error {
  909. iter := q.Iter()
  910. if err := iter.checkErrAndNotFound(); err != nil {
  911. return err
  912. }
  913. iter.Scan(dest...)
  914. return iter.Close()
  915. }
  916. // ScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
  917. // statement containing an IF clause). If the transaction fails because
  918. // the existing values did not match, the previous values will be stored
  919. // in dest.
  920. func (q *Query) ScanCAS(dest ...interface{}) (applied bool, err error) {
  921. q.disableSkipMetadata = true
  922. iter := q.Iter()
  923. if err := iter.checkErrAndNotFound(); err != nil {
  924. return false, err
  925. }
  926. if len(iter.Columns()) > 1 {
  927. dest = append([]interface{}{&applied}, dest...)
  928. iter.Scan(dest...)
  929. } else {
  930. iter.Scan(&applied)
  931. }
  932. return applied, iter.Close()
  933. }
  934. // MapScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
  935. // statement containing an IF clause). If the transaction fails because
  936. // the existing values did not match, the previous values will be stored
  937. // in dest map.
  938. //
  939. // As for INSERT .. IF NOT EXISTS, previous values will be returned as if
  940. // SELECT * FROM. So using ScanCAS with INSERT is inherently prone to
  941. // column mismatching. MapScanCAS is added to capture them safely.
  942. func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error) {
  943. q.disableSkipMetadata = true
  944. iter := q.Iter()
  945. if err := iter.checkErrAndNotFound(); err != nil {
  946. return false, err
  947. }
  948. iter.MapScan(dest)
  949. applied = dest["[applied]"].(bool)
  950. delete(dest, "[applied]")
  951. return applied, iter.Close()
  952. }
  953. // Release releases a query back into a pool of queries. Released Queries
  954. // cannot be reused.
  955. //
  956. // Example:
  957. // qry := session.Query("SELECT * FROM my_table")
  958. // qry.Exec()
  959. // qry.Release()
  960. func (q *Query) Release() {
  961. q.reset()
  962. queryPool.Put(q)
  963. }
  964. // reset zeroes out all fields of a query so that it can be safely pooled.
  965. func (q *Query) reset() {
  966. *q = Query{}
  967. }
  968. // Iter represents an iterator that can be used to iterate over all rows that
  969. // were returned by a query. The iterator might send additional queries to the
  970. // database during the iteration if paging was enabled.
  971. type Iter struct {
  972. err error
  973. pos int
  974. meta resultMetadata
  975. numRows int
  976. next *nextIter
  977. host *HostInfo
  978. framer *framer
  979. closed int32
  980. }
  981. // Host returns the host which the query was sent to.
  982. func (iter *Iter) Host() *HostInfo {
  983. return iter.host
  984. }
  985. // Columns returns the name and type of the selected columns.
  986. func (iter *Iter) Columns() []ColumnInfo {
  987. return iter.meta.columns
  988. }
  989. type Scanner interface {
  990. // Next advances the row pointer to point at the next row, the row is valid until
  991. // the next call of Next. It returns true if there is a row which is available to be
  992. // scanned into with Scan.
  993. // Next must be called before every call to Scan.
  994. Next() bool
  995. // Scan copies the current row's columns into dest. If the length of dest does not equal
  996. // the number of columns returned in the row an error is returned. If an error is encountered
  997. // when unmarshalling a column into the value in dest an error is returned and the row is invalidated
  998. // until the next call to Next.
  999. // Next must be called before calling Scan, if it is not an error is returned.
  1000. Scan(...interface{}) error
  1001. // Err returns the if there was one during iteration that resulted in iteration being unable to complete.
  1002. // Err will also release resources held by the iterator, the Scanner should not used after being called.
  1003. Err() error
  1004. }
  1005. type iterScanner struct {
  1006. iter *Iter
  1007. cols [][]byte
  1008. valid bool
  1009. }
  1010. func (is *iterScanner) Next() bool {
  1011. iter := is.iter
  1012. if iter.err != nil {
  1013. return false
  1014. }
  1015. if iter.pos >= iter.numRows {
  1016. if iter.next != nil {
  1017. is.iter = iter.next.fetch()
  1018. return is.Next()
  1019. }
  1020. return false
  1021. }
  1022. for i := 0; i < len(is.cols); i++ {
  1023. col, err := iter.readColumn()
  1024. if err != nil {
  1025. iter.err = err
  1026. return false
  1027. }
  1028. is.cols[i] = col
  1029. }
  1030. iter.pos++
  1031. is.valid = true
  1032. return true
  1033. }
  1034. func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) {
  1035. if dest[0] == nil {
  1036. return 1, nil
  1037. }
  1038. if col.TypeInfo.Type() == TypeTuple {
  1039. // this will panic, actually a bug, please report
  1040. tuple := col.TypeInfo.(TupleTypeInfo)
  1041. count := len(tuple.Elems)
  1042. // here we pass in a slice of the struct which has the number number of
  1043. // values as elements in the tuple
  1044. if err := Unmarshal(col.TypeInfo, p, dest[:count]); err != nil {
  1045. return 0, err
  1046. }
  1047. return count, nil
  1048. } else {
  1049. if err := Unmarshal(col.TypeInfo, p, dest[0]); err != nil {
  1050. return 0, err
  1051. }
  1052. return 1, nil
  1053. }
  1054. }
  1055. func (is *iterScanner) Scan(dest ...interface{}) error {
  1056. if !is.valid {
  1057. return errors.New("gocql: Scan called without calling Next")
  1058. }
  1059. iter := is.iter
  1060. // currently only support scanning into an expand tuple, such that its the same
  1061. // as scanning in more values from a single column
  1062. if len(dest) != iter.meta.actualColCount {
  1063. return fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
  1064. }
  1065. // i is the current position in dest, could posible replace it and just use
  1066. // slices of dest
  1067. i := 0
  1068. var err error
  1069. for _, col := range iter.meta.columns {
  1070. var n int
  1071. n, err = scanColumn(is.cols[i], col, dest[i:])
  1072. if err != nil {
  1073. break
  1074. }
  1075. i += n
  1076. }
  1077. is.valid = false
  1078. return err
  1079. }
  1080. func (is *iterScanner) Err() error {
  1081. iter := is.iter
  1082. is.iter = nil
  1083. is.cols = nil
  1084. is.valid = false
  1085. return iter.Close()
  1086. }
  1087. // Scanner returns a row Scanner which provides an interface to scan rows in a manner which is
  1088. // similar to database/sql. The iter should NOT be used again after calling this method.
  1089. func (iter *Iter) Scanner() Scanner {
  1090. if iter == nil {
  1091. return nil
  1092. }
  1093. return &iterScanner{iter: iter, cols: make([][]byte, len(iter.meta.columns))}
  1094. }
  1095. func (iter *Iter) readColumn() ([]byte, error) {
  1096. return iter.framer.readBytesInternal()
  1097. }
  1098. // Scan consumes the next row of the iterator and copies the columns of the
  1099. // current row into the values pointed at by dest. Use nil as a dest value
  1100. // to skip the corresponding column. Scan might send additional queries
  1101. // to the database to retrieve the next set of rows if paging was enabled.
  1102. //
  1103. // Scan returns true if the row was successfully unmarshaled or false if the
  1104. // end of the result set was reached or if an error occurred. Close should
  1105. // be called afterwards to retrieve any potential errors.
  1106. func (iter *Iter) Scan(dest ...interface{}) bool {
  1107. if iter.err != nil {
  1108. return false
  1109. }
  1110. if iter.pos >= iter.numRows {
  1111. if iter.next != nil {
  1112. *iter = *iter.next.fetch()
  1113. return iter.Scan(dest...)
  1114. }
  1115. return false
  1116. }
  1117. if iter.next != nil && iter.pos == iter.next.pos {
  1118. go iter.next.fetch()
  1119. }
  1120. // currently only support scanning into an expand tuple, such that its the same
  1121. // as scanning in more values from a single column
  1122. if len(dest) != iter.meta.actualColCount {
  1123. iter.err = fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
  1124. return false
  1125. }
  1126. // i is the current position in dest, could posible replace it and just use
  1127. // slices of dest
  1128. i := 0
  1129. for _, col := range iter.meta.columns {
  1130. colBytes, err := iter.readColumn()
  1131. if err != nil {
  1132. iter.err = err
  1133. return false
  1134. }
  1135. n, err := scanColumn(colBytes, col, dest[i:])
  1136. if err != nil {
  1137. iter.err = err
  1138. return false
  1139. }
  1140. i += n
  1141. }
  1142. iter.pos++
  1143. return true
  1144. }
  1145. // GetCustomPayload returns any parsed custom payload results if given in the
  1146. // response from Cassandra. Note that the result is not a copy.
  1147. //
  1148. // This additional feature of CQL Protocol v4
  1149. // allows additional results and query information to be returned by
  1150. // custom QueryHandlers running in your C* cluster.
  1151. // See https://datastax.github.io/java-driver/manual/custom_payloads/
  1152. func (iter *Iter) GetCustomPayload() map[string][]byte {
  1153. return iter.framer.header.customPayload
  1154. }
  1155. // Warnings returns any warnings generated if given in the response from Cassandra.
  1156. //
  1157. // This is only available starting with CQL Protocol v4.
  1158. func (iter *Iter) Warnings() []string {
  1159. if iter.framer != nil {
  1160. return iter.framer.header.warnings
  1161. }
  1162. return nil
  1163. }
  1164. // Close closes the iterator and returns any errors that happened during
  1165. // the query or the iteration.
  1166. func (iter *Iter) Close() error {
  1167. if atomic.CompareAndSwapInt32(&iter.closed, 0, 1) {
  1168. if iter.framer != nil {
  1169. iter.framer = nil
  1170. }
  1171. }
  1172. return iter.err
  1173. }
  1174. // WillSwitchPage detects if iterator reached end of current page
  1175. // and the next page is available.
  1176. func (iter *Iter) WillSwitchPage() bool {
  1177. return iter.pos >= iter.numRows && iter.next != nil
  1178. }
  1179. // checkErrAndNotFound handle error and NotFound in one method.
  1180. func (iter *Iter) checkErrAndNotFound() error {
  1181. if iter.err != nil {
  1182. return iter.err
  1183. } else if iter.numRows == 0 {
  1184. return ErrNotFound
  1185. }
  1186. return nil
  1187. }
  1188. // PageState return the current paging state for a query which can be used for
  1189. // subsequent quries to resume paging this point.
  1190. func (iter *Iter) PageState() []byte {
  1191. return iter.meta.pagingState
  1192. }
  1193. // NumRows returns the number of rows in this pagination, it will update when new
  1194. // pages are fetched, it is not the value of the total number of rows this iter
  1195. // will return unless there is only a single page returned.
  1196. func (iter *Iter) NumRows() int {
  1197. return iter.numRows
  1198. }
  1199. type nextIter struct {
  1200. qry Query
  1201. pos int
  1202. once sync.Once
  1203. next *Iter
  1204. conn *Conn
  1205. }
  1206. func (n *nextIter) fetch() *Iter {
  1207. n.once.Do(func() {
  1208. iter := n.qry.session.executor.attemptQuery(&n.qry, n.conn)
  1209. if iter != nil && iter.err == nil {
  1210. n.next = iter
  1211. } else {
  1212. n.next = n.qry.session.executeQuery(&n.qry)
  1213. }
  1214. })
  1215. return n.next
  1216. }
  1217. type Batch struct {
  1218. Type BatchType
  1219. Entries []BatchEntry
  1220. Cons Consistency
  1221. rt RetryPolicy
  1222. observer BatchObserver
  1223. serialCons SerialConsistency
  1224. defaultTimestamp bool
  1225. defaultTimestampValue int64
  1226. context context.Context
  1227. cancelBatch func()
  1228. keyspace string
  1229. metrics map[string]*queryMetrics
  1230. }
  1231. // NewBatch creates a new batch operation without defaults from the cluster
  1232. //
  1233. // Depreicated: use session.NewBatch instead
  1234. func NewBatch(typ BatchType) *Batch {
  1235. return &Batch{Type: typ}
  1236. }
  1237. // NewBatch creates a new batch operation using defaults defined in the cluster
  1238. func (s *Session) NewBatch(typ BatchType) *Batch {
  1239. s.mu.RLock()
  1240. batch := &Batch{
  1241. Type: typ,
  1242. rt: s.cfg.RetryPolicy,
  1243. serialCons: s.cfg.SerialConsistency,
  1244. observer: s.batchObserver,
  1245. Cons: s.cons,
  1246. defaultTimestamp: s.cfg.DefaultTimestamp,
  1247. keyspace: s.cfg.Keyspace,
  1248. metrics: make(map[string]*queryMetrics),
  1249. }
  1250. // Initiate an empty context with a cancel call
  1251. batch.WithContext(context.Background())
  1252. s.mu.RUnlock()
  1253. return batch
  1254. }
  1255. func (b *Batch) getHostMetrics(host *HostInfo) *queryMetrics {
  1256. hostMetrics, exists := b.metrics[host.ConnectAddress().String()]
  1257. if !exists {
  1258. // if the host is not in the map, it means it's been accessed for the first time
  1259. hostMetrics = &queryMetrics{Attempts: 0, TotalLatency: 0}
  1260. b.metrics[host.ConnectAddress().String()] = hostMetrics
  1261. }
  1262. return hostMetrics
  1263. }
  1264. // Observer enables batch-level observer on this batch.
  1265. // The provided observer will be called every time this batched query is executed.
  1266. func (b *Batch) Observer(observer BatchObserver) *Batch {
  1267. b.observer = observer
  1268. return b
  1269. }
  1270. func (b *Batch) Keyspace() string {
  1271. return b.keyspace
  1272. }
  1273. // Attempts returns the number of attempts made to execute the batch.
  1274. func (b *Batch) Attempts() int {
  1275. attempts := 0
  1276. for _, metric := range b.metrics {
  1277. attempts += metric.Attempts
  1278. }
  1279. return attempts
  1280. }
  1281. //Latency returns the average number of nanoseconds to execute a single attempt of the batch.
  1282. func (b *Batch) Latency() int64 {
  1283. attempts := 0
  1284. var latency int64 = 0
  1285. for _, metric := range b.metrics {
  1286. attempts += metric.Attempts
  1287. latency += metric.TotalLatency
  1288. }
  1289. if attempts > 0 {
  1290. return latency / int64(attempts)
  1291. }
  1292. return 0
  1293. }
  1294. // GetConsistency returns the currently configured consistency level for the batch
  1295. // operation.
  1296. func (b *Batch) GetConsistency() Consistency {
  1297. return b.Cons
  1298. }
  1299. // SetConsistency sets the currently configured consistency level for the batch
  1300. // operation.
  1301. func (b *Batch) SetConsistency(c Consistency) {
  1302. b.Cons = c
  1303. }
  1304. // Query adds the query to the batch operation
  1305. func (b *Batch) Query(stmt string, args ...interface{}) {
  1306. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
  1307. }
  1308. // Bind adds the query to the batch operation and correlates it with a binding callback
  1309. // that will be invoked when the batch is executed. The binding callback allows the application
  1310. // to define which query argument values will be marshalled as part of the batch execution.
  1311. func (b *Batch) Bind(stmt string, bind func(q *QueryInfo) ([]interface{}, error)) {
  1312. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, binding: bind})
  1313. }
  1314. func (b *Batch) retryPolicy() RetryPolicy {
  1315. return b.rt
  1316. }
  1317. // RetryPolicy sets the retry policy to use when executing the batch operation
  1318. func (b *Batch) RetryPolicy(r RetryPolicy) *Batch {
  1319. b.rt = r
  1320. return b
  1321. }
  1322. // WithContext will set the context to use during a query, it will be used to
  1323. // timeout when waiting for responses from Cassandra. Additionally it adds
  1324. // the cancel function so that it can be called whenever necessary.
  1325. func (b *Batch) WithContext(ctx context.Context) *Batch {
  1326. b.context, b.cancelBatch = context.WithCancel(ctx)
  1327. return b
  1328. }
  1329. func (b *Batch) Cancel() {
  1330. b.cancelBatch()
  1331. }
  1332. // Size returns the number of batch statements to be executed by the batch operation.
  1333. func (b *Batch) Size() int {
  1334. return len(b.Entries)
  1335. }
  1336. // SerialConsistency sets the consistency level for the
  1337. // serial phase of conditional updates. That consistency can only be
  1338. // either SERIAL or LOCAL_SERIAL and if not present, it defaults to
  1339. // SERIAL. This option will be ignored for anything else that a
  1340. // conditional update/insert.
  1341. //
  1342. // Only available for protocol 3 and above
  1343. func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
  1344. b.serialCons = cons
  1345. return b
  1346. }
  1347. // DefaultTimestamp will enable the with default timestamp flag on the query.
  1348. // If enable, this will replace the server side assigned
  1349. // timestamp as default timestamp. Note that a timestamp in the query itself
  1350. // will still override this timestamp. This is entirely optional.
  1351. //
  1352. // Only available on protocol >= 3
  1353. func (b *Batch) DefaultTimestamp(enable bool) *Batch {
  1354. b.defaultTimestamp = enable
  1355. return b
  1356. }
  1357. // WithTimestamp will enable the with default timestamp flag on the query
  1358. // like DefaultTimestamp does. But also allows to define value for timestamp.
  1359. // It works the same way as USING TIMESTAMP in the query itself, but
  1360. // should not break prepared query optimization
  1361. //
  1362. // Only available on protocol >= 3
  1363. func (b *Batch) WithTimestamp(timestamp int64) *Batch {
  1364. b.DefaultTimestamp(true)
  1365. b.defaultTimestampValue = timestamp
  1366. return b
  1367. }
  1368. func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
  1369. hostMetrics := b.getHostMetrics(host)
  1370. hostMetrics.Attempts++
  1371. hostMetrics.TotalLatency += end.Sub(start).Nanoseconds()
  1372. if b.observer == nil {
  1373. return
  1374. }
  1375. statements := make([]string, len(b.Entries))
  1376. for i, entry := range b.Entries {
  1377. statements[i] = entry.Stmt
  1378. }
  1379. b.observer.ObserveBatch(b.context, ObservedBatch{
  1380. Keyspace: keyspace,
  1381. Statements: statements,
  1382. Start: start,
  1383. End: end,
  1384. // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
  1385. Host: host,
  1386. Metrics: hostMetrics,
  1387. Err: iter.err,
  1388. })
  1389. }
  1390. func (b *Batch) GetRoutingKey() ([]byte, error) {
  1391. // TODO: use the first statement in the batch as the routing key?
  1392. return nil, nil
  1393. }
  1394. type BatchType byte
  1395. const (
  1396. LoggedBatch BatchType = 0
  1397. UnloggedBatch BatchType = 1
  1398. CounterBatch BatchType = 2
  1399. )
  1400. type BatchEntry struct {
  1401. Stmt string
  1402. Args []interface{}
  1403. binding func(q *QueryInfo) ([]interface{}, error)
  1404. }
  1405. type ColumnInfo struct {
  1406. Keyspace string
  1407. Table string
  1408. Name string
  1409. TypeInfo TypeInfo
  1410. }
  1411. func (c ColumnInfo) String() string {
  1412. return fmt.Sprintf("[column keyspace=%s table=%s name=%s type=%v]", c.Keyspace, c.Table, c.Name, c.TypeInfo)
  1413. }
  1414. // routing key indexes LRU cache
  1415. type routingKeyInfoLRU struct {
  1416. lru *lru.Cache
  1417. mu sync.Mutex
  1418. }
  1419. type routingKeyInfo struct {
  1420. indexes []int
  1421. types []TypeInfo
  1422. }
  1423. func (r *routingKeyInfo) String() string {
  1424. return fmt.Sprintf("routing key index=%v types=%v", r.indexes, r.types)
  1425. }
  1426. func (r *routingKeyInfoLRU) Remove(key string) {
  1427. r.mu.Lock()
  1428. r.lru.Remove(key)
  1429. r.mu.Unlock()
  1430. }
  1431. //Max adjusts the maximum size of the cache and cleans up the oldest records if
  1432. //the new max is lower than the previous value. Not concurrency safe.
  1433. func (r *routingKeyInfoLRU) Max(max int) {
  1434. r.mu.Lock()
  1435. for r.lru.Len() > max {
  1436. r.lru.RemoveOldest()
  1437. }
  1438. r.lru.MaxEntries = max
  1439. r.mu.Unlock()
  1440. }
  1441. type inflightCachedEntry struct {
  1442. wg sync.WaitGroup
  1443. err error
  1444. value interface{}
  1445. }
  1446. // Tracer is the interface implemented by query tracers. Tracers have the
  1447. // ability to obtain a detailed event log of all events that happened during
  1448. // the execution of a query from Cassandra. Gathering this information might
  1449. // be essential for debugging and optimizing queries, but this feature should
  1450. // not be used on production systems with very high load.
  1451. type Tracer interface {
  1452. Trace(traceId []byte)
  1453. }
  1454. type traceWriter struct {
  1455. session *Session
  1456. w io.Writer
  1457. mu sync.Mutex
  1458. }
  1459. // NewTraceWriter returns a simple Tracer implementation that outputs
  1460. // the event log in a textual format.
  1461. func NewTraceWriter(session *Session, w io.Writer) Tracer {
  1462. return &traceWriter{session: session, w: w}
  1463. }
  1464. func (t *traceWriter) Trace(traceId []byte) {
  1465. var (
  1466. coordinator string
  1467. duration int
  1468. )
  1469. iter := t.session.control.query(`SELECT coordinator, duration
  1470. FROM system_traces.sessions
  1471. WHERE session_id = ?`, traceId)
  1472. iter.Scan(&coordinator, &duration)
  1473. if err := iter.Close(); err != nil {
  1474. t.mu.Lock()
  1475. fmt.Fprintln(t.w, "Error:", err)
  1476. t.mu.Unlock()
  1477. return
  1478. }
  1479. var (
  1480. timestamp time.Time
  1481. activity string
  1482. source string
  1483. elapsed int
  1484. )
  1485. t.mu.Lock()
  1486. defer t.mu.Unlock()
  1487. fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n",
  1488. traceId, coordinator, time.Duration(duration)*time.Microsecond)
  1489. iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed
  1490. FROM system_traces.events
  1491. WHERE session_id = ?`, traceId)
  1492. for iter.Scan(&timestamp, &activity, &source, &elapsed) {
  1493. fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n",
  1494. timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed)
  1495. }
  1496. if err := iter.Close(); err != nil {
  1497. fmt.Fprintln(t.w, "Error:", err)
  1498. }
  1499. }
  1500. type ObservedQuery struct {
  1501. Keyspace string
  1502. Statement string
  1503. Start time.Time // time immediately before the query was called
  1504. End time.Time // time immediately after the query returned
  1505. // Rows is the number of rows in the current iter.
  1506. // In paginated queries, rows from previous scans are not counted.
  1507. // Rows is not used in batch queries and remains at the default value
  1508. Rows int
  1509. // Host is the informations about the host that performed the query
  1510. Host *HostInfo
  1511. // The metrics per this host
  1512. Metrics *queryMetrics
  1513. // Err is the error in the query.
  1514. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
  1515. Err error
  1516. }
  1517. // QueryObserver is the interface implemented by query observers / stat collectors.
  1518. //
  1519. // Experimental, this interface and use may change
  1520. type QueryObserver interface {
  1521. // ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled.
  1522. // It doesn't get called if there is no query because the session is closed or there are no connections available.
  1523. // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
  1524. ObserveQuery(context.Context, ObservedQuery)
  1525. }
  1526. type ObservedBatch struct {
  1527. Keyspace string
  1528. Statements []string
  1529. Start time.Time // time immediately before the batch query was called
  1530. End time.Time // time immediately after the batch query returned
  1531. // Host is the informations about the host that performed the batch
  1532. Host *HostInfo
  1533. // Err is the error in the batch query.
  1534. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
  1535. Err error
  1536. // The metrics per this host
  1537. Metrics *queryMetrics
  1538. }
  1539. // BatchObserver is the interface implemented by batch observers / stat collectors.
  1540. type BatchObserver interface {
  1541. // ObserveBatch gets called on every batch query to cassandra.
  1542. // It also gets called once for each query in a batch.
  1543. // It doesn't get called if there is no query because the session is closed or there are no connections available.
  1544. // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
  1545. // Unlike QueryObserver.ObserveQuery it does no reporting on rows read.
  1546. ObserveBatch(context.Context, ObservedBatch)
  1547. }
  1548. type ObservedConnect struct {
  1549. // Host is the information about the host about to connect
  1550. Host *HostInfo
  1551. Start time.Time // time immediately before the dial is called
  1552. End time.Time // time immediately after the dial returned
  1553. // Err is the connection error (if any)
  1554. Err error
  1555. }
  1556. // ConnectObserver is the interface implemented by connect observers / stat collectors.
  1557. type ConnectObserver interface {
  1558. // ObserveConnect gets called when a new connection to cassandra is made.
  1559. ObserveConnect(ObservedConnect)
  1560. }
  1561. type Error struct {
  1562. Code int
  1563. Message string
  1564. }
  1565. func (e Error) Error() string {
  1566. return e.Message
  1567. }
  1568. var (
  1569. ErrNotFound = errors.New("not found")
  1570. ErrUnavailable = errors.New("unavailable")
  1571. ErrUnsupported = errors.New("feature not supported")
  1572. ErrTooManyStmts = errors.New("too many statements")
  1573. ErrUseStmt = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explanation.")
  1574. ErrSessionClosed = errors.New("session has been closed")
  1575. ErrNoConnections = errors.New("gocql: no hosts available in the pool")
  1576. ErrNoKeyspace = errors.New("no keyspace provided")
  1577. ErrKeyspaceDoesNotExist = errors.New("keyspace does not exist")
  1578. ErrNoMetadata = errors.New("no metadata available")
  1579. )
  1580. type ErrProtocol struct{ error }
  1581. func NewErrProtocol(format string, args ...interface{}) error {
  1582. return ErrProtocol{fmt.Errorf(format, args...)}
  1583. }
  1584. // BatchSizeMaximum is the maximum number of statements a batch operation can have.
  1585. // This limit is set by cassandra and could change in the future.
  1586. const BatchSizeMaximum = 65535