session.go 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. "unicode"
  18. "github.com/gocql/gocql/internal/lru"
  19. )
  20. // Session is the interface used by users to interact with the database.
  21. //
  22. // It's safe for concurrent use by multiple goroutines and a typical usage
  23. // scenario is to have one global session object to interact with the
  24. // whole Cassandra cluster.
  25. //
  26. // This type extends the Node interface by adding a convinient query builder
  27. // and automatically sets a default consistency level on all operations
  28. // that do not have a consistency level set.
  29. type Session struct {
  30. cons Consistency
  31. pageSize int
  32. prefetch float64
  33. routingKeyInfoCache routingKeyInfoLRU
  34. schemaDescriber *schemaDescriber
  35. trace Tracer
  36. queryObserver QueryObserver
  37. batchObserver BatchObserver
  38. hostSource *ringDescriber
  39. stmtsLRU *preparedLRU
  40. connCfg *ConnConfig
  41. executor *queryExecutor
  42. pool *policyConnPool
  43. policy HostSelectionPolicy
  44. ring ring
  45. metadata clusterMetadata
  46. mu sync.RWMutex
  47. control *controlConn
  48. // event handlers
  49. nodeEvents *eventDebouncer
  50. schemaEvents *eventDebouncer
  51. // ring metadata
  52. hosts []HostInfo
  53. useSystemSchema bool
  54. cfg ClusterConfig
  55. quit chan struct{}
  56. closeMu sync.RWMutex
  57. isClosed bool
  58. }
  59. var queryPool = &sync.Pool{
  60. New: func() interface{} {
  61. return new(Query)
  62. },
  63. }
  64. func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) {
  65. var hosts []*HostInfo
  66. for _, hostport := range addrs {
  67. resolvedHosts, err := hostInfo(hostport, defaultPort)
  68. if err != nil {
  69. // Try other hosts if unable to resolve DNS name
  70. if _, ok := err.(*net.DNSError); ok {
  71. Logger.Printf("gocql: dns error: %v\n", err)
  72. continue
  73. }
  74. return nil, err
  75. }
  76. hosts = append(hosts, resolvedHosts...)
  77. }
  78. if len(hosts) == 0 {
  79. return nil, errors.New("failed to resolve any of the provided hostnames")
  80. }
  81. return hosts, nil
  82. }
  83. // NewSession wraps an existing Node.
  84. func NewSession(cfg ClusterConfig) (*Session, error) {
  85. // Check that hosts in the ClusterConfig is not empty
  86. if len(cfg.Hosts) < 1 {
  87. return nil, ErrNoHosts
  88. }
  89. s := &Session{
  90. cons: cfg.Consistency,
  91. prefetch: 0.25,
  92. cfg: cfg,
  93. pageSize: cfg.PageSize,
  94. stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
  95. quit: make(chan struct{}),
  96. }
  97. s.schemaDescriber = newSchemaDescriber(s)
  98. s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent)
  99. s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent)
  100. s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
  101. s.hostSource = &ringDescriber{session: s}
  102. if cfg.PoolConfig.HostSelectionPolicy == nil {
  103. cfg.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
  104. }
  105. s.pool = cfg.PoolConfig.buildPool(s)
  106. s.policy = cfg.PoolConfig.HostSelectionPolicy
  107. s.policy.Init(s)
  108. s.executor = &queryExecutor{
  109. pool: s.pool,
  110. policy: cfg.PoolConfig.HostSelectionPolicy,
  111. }
  112. s.queryObserver = cfg.QueryObserver
  113. s.batchObserver = cfg.BatchObserver
  114. //Check the TLS Config before trying to connect to anything external
  115. connCfg, err := connConfig(&s.cfg)
  116. if err != nil {
  117. //TODO: Return a typed error
  118. return nil, fmt.Errorf("gocql: unable to create session: %v", err)
  119. }
  120. s.connCfg = connCfg
  121. if err := s.init(); err != nil {
  122. s.Close()
  123. if err == ErrNoConnectionsStarted {
  124. //This error used to be generated inside NewSession & returned directly
  125. //Forward it on up to be backwards compatible
  126. return nil, ErrNoConnectionsStarted
  127. } else {
  128. // TODO(zariel): dont wrap this error in fmt.Errorf, return a typed error
  129. return nil, fmt.Errorf("gocql: unable to create session: %v", err)
  130. }
  131. }
  132. return s, nil
  133. }
  134. func (s *Session) init() error {
  135. hosts, err := addrsToHosts(s.cfg.Hosts, s.cfg.Port)
  136. if err != nil {
  137. return err
  138. }
  139. s.ring.endpoints = hosts
  140. if !s.cfg.disableControlConn {
  141. s.control = createControlConn(s)
  142. if s.cfg.ProtoVersion == 0 {
  143. proto, err := s.control.discoverProtocol(hosts)
  144. if err != nil {
  145. return fmt.Errorf("unable to discover protocol version: %v", err)
  146. } else if proto == 0 {
  147. return errors.New("unable to discovery protocol version")
  148. }
  149. // TODO(zariel): we really only need this in 1 place
  150. s.cfg.ProtoVersion = proto
  151. s.connCfg.ProtoVersion = proto
  152. }
  153. if err := s.control.connect(hosts); err != nil {
  154. return err
  155. }
  156. if !s.cfg.DisableInitialHostLookup {
  157. var partitioner string
  158. newHosts, partitioner, err := s.hostSource.GetHosts()
  159. if err != nil {
  160. return err
  161. }
  162. s.policy.SetPartitioner(partitioner)
  163. filteredHosts := make([]*HostInfo, 0, len(newHosts))
  164. for _, host := range newHosts {
  165. if !s.cfg.filterHost(host) {
  166. filteredHosts = append(filteredHosts, host)
  167. }
  168. }
  169. hosts = append(hosts, filteredHosts...)
  170. }
  171. }
  172. hostMap := make(map[string]*HostInfo, len(hosts))
  173. for _, host := range hosts {
  174. hostMap[host.ConnectAddress().String()] = host
  175. }
  176. for _, host := range hostMap {
  177. host = s.ring.addOrUpdate(host)
  178. s.addNewNode(host)
  179. }
  180. // TODO(zariel): we probably dont need this any more as we verify that we
  181. // can connect to one of the endpoints supplied by using the control conn.
  182. // See if there are any connections in the pool
  183. if s.cfg.ReconnectInterval > 0 {
  184. go s.reconnectDownedHosts(s.cfg.ReconnectInterval)
  185. }
  186. // If we disable the initial host lookup, we need to still check if the
  187. // cluster is using the newer system schema or not... however, if control
  188. // connection is disable, we really have no choice, so we just make our
  189. // best guess...
  190. if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup {
  191. newer, _ := checkSystemSchema(s.control)
  192. s.useSystemSchema = newer
  193. } else {
  194. host := s.ring.rrHost()
  195. s.useSystemSchema = host.Version().Major >= 3
  196. }
  197. if s.pool.Size() == 0 {
  198. return ErrNoConnectionsStarted
  199. }
  200. return nil
  201. }
  202. func (s *Session) reconnectDownedHosts(intv time.Duration) {
  203. reconnectTicker := time.NewTicker(intv)
  204. defer reconnectTicker.Stop()
  205. for {
  206. select {
  207. case <-reconnectTicker.C:
  208. hosts := s.ring.allHosts()
  209. // Print session.ring for debug.
  210. if gocqlDebug {
  211. buf := bytes.NewBufferString("Session.ring:")
  212. for _, h := range hosts {
  213. buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]")
  214. }
  215. Logger.Println(buf.String())
  216. }
  217. for _, h := range hosts {
  218. if h.IsUp() {
  219. continue
  220. }
  221. s.handleNodeUp(h.ConnectAddress(), h.Port(), true)
  222. }
  223. case <-s.quit:
  224. return
  225. }
  226. }
  227. }
  228. // SetConsistency sets the default consistency level for this session. This
  229. // setting can also be changed on a per-query basis and the default value
  230. // is Quorum.
  231. func (s *Session) SetConsistency(cons Consistency) {
  232. s.mu.Lock()
  233. s.cons = cons
  234. s.mu.Unlock()
  235. }
  236. // SetPageSize sets the default page size for this session. A value <= 0 will
  237. // disable paging. This setting can also be changed on a per-query basis.
  238. func (s *Session) SetPageSize(n int) {
  239. s.mu.Lock()
  240. s.pageSize = n
  241. s.mu.Unlock()
  242. }
  243. // SetPrefetch sets the default threshold for pre-fetching new pages. If
  244. // there are only p*pageSize rows remaining, the next page will be requested
  245. // automatically. This value can also be changed on a per-query basis and
  246. // the default value is 0.25.
  247. func (s *Session) SetPrefetch(p float64) {
  248. s.mu.Lock()
  249. s.prefetch = p
  250. s.mu.Unlock()
  251. }
  252. // SetTrace sets the default tracer for this session. This setting can also
  253. // be changed on a per-query basis.
  254. func (s *Session) SetTrace(trace Tracer) {
  255. s.mu.Lock()
  256. s.trace = trace
  257. s.mu.Unlock()
  258. }
  259. // Query generates a new query object for interacting with the database.
  260. // Further details of the query may be tweaked using the resulting query
  261. // value before the query is executed. Query is automatically prepared
  262. // if it has not previously been executed.
  263. func (s *Session) Query(stmt string, values ...interface{}) *Query {
  264. s.mu.RLock()
  265. qry := queryPool.Get().(*Query)
  266. qry.stmt = stmt
  267. qry.values = values
  268. qry.cons = s.cons
  269. qry.session = s
  270. qry.pageSize = s.pageSize
  271. qry.trace = s.trace
  272. qry.observer = s.queryObserver
  273. qry.prefetch = s.prefetch
  274. qry.rt = s.cfg.RetryPolicy
  275. qry.serialCons = s.cfg.SerialConsistency
  276. qry.defaultTimestamp = s.cfg.DefaultTimestamp
  277. qry.idempotent = s.cfg.DefaultIdempotence
  278. s.mu.RUnlock()
  279. return qry
  280. }
  281. type QueryInfo struct {
  282. Id []byte
  283. Args []ColumnInfo
  284. Rval []ColumnInfo
  285. PKeyColumns []int
  286. }
  287. // Bind generates a new query object based on the query statement passed in.
  288. // The query is automatically prepared if it has not previously been executed.
  289. // The binding callback allows the application to define which query argument
  290. // values will be marshalled as part of the query execution.
  291. // During execution, the meta data of the prepared query will be routed to the
  292. // binding callback, which is responsible for producing the query argument values.
  293. func (s *Session) Bind(stmt string, b func(q *QueryInfo) ([]interface{}, error)) *Query {
  294. s.mu.RLock()
  295. qry := &Query{stmt: stmt, binding: b, cons: s.cons,
  296. session: s, pageSize: s.pageSize, trace: s.trace, observer: s.queryObserver,
  297. prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
  298. s.mu.RUnlock()
  299. return qry
  300. }
  301. // Close closes all connections. The session is unusable after this
  302. // operation.
  303. func (s *Session) Close() {
  304. s.closeMu.Lock()
  305. defer s.closeMu.Unlock()
  306. if s.isClosed {
  307. return
  308. }
  309. s.isClosed = true
  310. if s.pool != nil {
  311. s.pool.Close()
  312. }
  313. if s.control != nil {
  314. s.control.close()
  315. }
  316. if s.nodeEvents != nil {
  317. s.nodeEvents.stop()
  318. }
  319. if s.schemaEvents != nil {
  320. s.schemaEvents.stop()
  321. }
  322. if s.quit != nil {
  323. close(s.quit)
  324. }
  325. }
  326. func (s *Session) Closed() bool {
  327. s.closeMu.RLock()
  328. closed := s.isClosed
  329. s.closeMu.RUnlock()
  330. return closed
  331. }
  332. func (s *Session) executeQuery(qry *Query) (it *Iter) {
  333. // fail fast
  334. if s.Closed() {
  335. return &Iter{err: ErrSessionClosed}
  336. }
  337. iter, err := s.executor.executeQuery(qry)
  338. if err != nil {
  339. return &Iter{err: err}
  340. }
  341. if iter == nil {
  342. panic("nil iter")
  343. }
  344. return iter
  345. }
  346. func (s *Session) removeHost(h *HostInfo) {
  347. s.policy.RemoveHost(h)
  348. s.pool.removeHost(h.ConnectAddress())
  349. s.ring.removeHost(h.ConnectAddress())
  350. }
  351. // KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist.
  352. func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
  353. // fail fast
  354. if s.Closed() {
  355. return nil, ErrSessionClosed
  356. } else if keyspace == "" {
  357. return nil, ErrNoKeyspace
  358. }
  359. return s.schemaDescriber.getSchema(keyspace)
  360. }
  361. func (s *Session) getConn() *Conn {
  362. hosts := s.ring.allHosts()
  363. for _, host := range hosts {
  364. if !host.IsUp() {
  365. continue
  366. }
  367. pool, ok := s.pool.getPool(host)
  368. if !ok {
  369. continue
  370. } else if conn := pool.Pick(); conn != nil {
  371. return conn
  372. }
  373. }
  374. return nil
  375. }
  376. // returns routing key indexes and type info
  377. func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyInfo, error) {
  378. s.routingKeyInfoCache.mu.Lock()
  379. entry, cached := s.routingKeyInfoCache.lru.Get(stmt)
  380. if cached {
  381. // done accessing the cache
  382. s.routingKeyInfoCache.mu.Unlock()
  383. // the entry is an inflight struct similar to that used by
  384. // Conn to prepare statements
  385. inflight := entry.(*inflightCachedEntry)
  386. // wait for any inflight work
  387. inflight.wg.Wait()
  388. if inflight.err != nil {
  389. return nil, inflight.err
  390. }
  391. key, _ := inflight.value.(*routingKeyInfo)
  392. return key, nil
  393. }
  394. // create a new inflight entry while the data is created
  395. inflight := new(inflightCachedEntry)
  396. inflight.wg.Add(1)
  397. defer inflight.wg.Done()
  398. s.routingKeyInfoCache.lru.Add(stmt, inflight)
  399. s.routingKeyInfoCache.mu.Unlock()
  400. var (
  401. info *preparedStatment
  402. partitionKey []*ColumnMetadata
  403. )
  404. conn := s.getConn()
  405. if conn == nil {
  406. // TODO: better error?
  407. inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available")
  408. return nil, inflight.err
  409. }
  410. // get the query info for the statement
  411. info, inflight.err = conn.prepareStatement(ctx, stmt, nil)
  412. if inflight.err != nil {
  413. // don't cache this error
  414. s.routingKeyInfoCache.Remove(stmt)
  415. return nil, inflight.err
  416. }
  417. // TODO: it would be nice to mark hosts here but as we are not using the policies
  418. // to fetch hosts we cant
  419. if info.request.colCount == 0 {
  420. // no arguments, no routing key, and no error
  421. return nil, nil
  422. }
  423. if len(info.request.pkeyColumns) > 0 {
  424. // proto v4 dont need to calculate primary key columns
  425. types := make([]TypeInfo, len(info.request.pkeyColumns))
  426. for i, col := range info.request.pkeyColumns {
  427. types[i] = info.request.columns[col].TypeInfo
  428. }
  429. routingKeyInfo := &routingKeyInfo{
  430. indexes: info.request.pkeyColumns,
  431. types: types,
  432. }
  433. inflight.value = routingKeyInfo
  434. return routingKeyInfo, nil
  435. }
  436. // get the table metadata
  437. table := info.request.columns[0].Table
  438. var keyspaceMetadata *KeyspaceMetadata
  439. keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace)
  440. if inflight.err != nil {
  441. // don't cache this error
  442. s.routingKeyInfoCache.Remove(stmt)
  443. return nil, inflight.err
  444. }
  445. tableMetadata, found := keyspaceMetadata.Tables[table]
  446. if !found {
  447. // unlikely that the statement could be prepared and the metadata for
  448. // the table couldn't be found, but this may indicate either a bug
  449. // in the metadata code, or that the table was just dropped.
  450. inflight.err = ErrNoMetadata
  451. // don't cache this error
  452. s.routingKeyInfoCache.Remove(stmt)
  453. return nil, inflight.err
  454. }
  455. partitionKey = tableMetadata.PartitionKey
  456. size := len(partitionKey)
  457. routingKeyInfo := &routingKeyInfo{
  458. indexes: make([]int, size),
  459. types: make([]TypeInfo, size),
  460. }
  461. for keyIndex, keyColumn := range partitionKey {
  462. // set an indicator for checking if the mapping is missing
  463. routingKeyInfo.indexes[keyIndex] = -1
  464. // find the column in the query info
  465. for argIndex, boundColumn := range info.request.columns {
  466. if keyColumn.Name == boundColumn.Name {
  467. // there may be many such bound columns, pick the first
  468. routingKeyInfo.indexes[keyIndex] = argIndex
  469. routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo
  470. break
  471. }
  472. }
  473. if routingKeyInfo.indexes[keyIndex] == -1 {
  474. // missing a routing key column mapping
  475. // no routing key, and no error
  476. return nil, nil
  477. }
  478. }
  479. // cache this result
  480. inflight.value = routingKeyInfo
  481. return routingKeyInfo, nil
  482. }
  483. func (b *Batch) execute(conn *Conn) *Iter {
  484. return conn.executeBatch(b)
  485. }
  486. func (s *Session) executeBatch(batch *Batch) *Iter {
  487. // fail fast
  488. if s.Closed() {
  489. return &Iter{err: ErrSessionClosed}
  490. }
  491. // Prevent the execution of the batch if greater than the limit
  492. // Currently batches have a limit of 65536 queries.
  493. // https://datastax-oss.atlassian.net/browse/JAVA-229
  494. if batch.Size() > BatchSizeMaximum {
  495. return &Iter{err: ErrTooManyStmts}
  496. }
  497. iter, err := s.executor.executeQuery(batch)
  498. if err != nil {
  499. return &Iter{err: err}
  500. }
  501. return iter
  502. }
  503. // ExecuteBatch executes a batch operation and returns nil if successful
  504. // otherwise an error is returned describing the failure.
  505. func (s *Session) ExecuteBatch(batch *Batch) error {
  506. iter := s.executeBatch(batch)
  507. return iter.Close()
  508. }
  509. // ExecuteBatchCAS executes a batch operation and returns true if successful and
  510. // an iterator (to scan aditional rows if more than one conditional statement)
  511. // was sent.
  512. // Further scans on the interator must also remember to include
  513. // the applied boolean as the first argument to *Iter.Scan
  514. func (s *Session) ExecuteBatchCAS(batch *Batch, dest ...interface{}) (applied bool, iter *Iter, err error) {
  515. iter = s.executeBatch(batch)
  516. if err := iter.checkErrAndNotFound(); err != nil {
  517. iter.Close()
  518. return false, nil, err
  519. }
  520. if len(iter.Columns()) > 1 {
  521. dest = append([]interface{}{&applied}, dest...)
  522. iter.Scan(dest...)
  523. } else {
  524. iter.Scan(&applied)
  525. }
  526. return applied, iter, nil
  527. }
  528. // MapExecuteBatchCAS executes a batch operation much like ExecuteBatchCAS,
  529. // however it accepts a map rather than a list of arguments for the initial
  530. // scan.
  531. func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) (applied bool, iter *Iter, err error) {
  532. iter = s.executeBatch(batch)
  533. if err := iter.checkErrAndNotFound(); err != nil {
  534. iter.Close()
  535. return false, nil, err
  536. }
  537. iter.MapScan(dest)
  538. applied = dest["[applied]"].(bool)
  539. delete(dest, "[applied]")
  540. // we usually close here, but instead of closing, just returin an error
  541. // if MapScan failed. Although Close just returns err, using Close
  542. // here might be confusing as we are not actually closing the iter
  543. return applied, iter, iter.err
  544. }
  545. func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) {
  546. return s.dial(host.ConnectAddress(), host.Port(), s.connCfg, errorHandler)
  547. }
  548. // Query represents a CQL statement that can be executed.
  549. type Query struct {
  550. stmt string
  551. values []interface{}
  552. cons Consistency
  553. pageSize int
  554. routingKey []byte
  555. routingKeyBuffer []byte
  556. pageState []byte
  557. prefetch float64
  558. trace Tracer
  559. observer QueryObserver
  560. session *Session
  561. rt RetryPolicy
  562. binding func(q *QueryInfo) ([]interface{}, error)
  563. attempts int
  564. totalLatency int64
  565. serialCons SerialConsistency
  566. defaultTimestamp bool
  567. defaultTimestampValue int64
  568. disableSkipMetadata bool
  569. context context.Context
  570. idempotent bool
  571. disableAutoPage bool
  572. }
  573. // String implements the stringer interface.
  574. func (q Query) String() string {
  575. return fmt.Sprintf("[query statement=%q values=%+v consistency=%s]", q.stmt, q.values, q.cons)
  576. }
  577. //Attempts returns the number of times the query was executed.
  578. func (q *Query) Attempts() int {
  579. return q.attempts
  580. }
  581. //Latency returns the average amount of nanoseconds per attempt of the query.
  582. func (q *Query) Latency() int64 {
  583. if q.attempts > 0 {
  584. return q.totalLatency / int64(q.attempts)
  585. }
  586. return 0
  587. }
  588. // Consistency sets the consistency level for this query. If no consistency
  589. // level have been set, the default consistency level of the cluster
  590. // is used.
  591. func (q *Query) Consistency(c Consistency) *Query {
  592. q.cons = c
  593. return q
  594. }
  595. // GetConsistency returns the currently configured consistency level for
  596. // the query.
  597. func (q *Query) GetConsistency() Consistency {
  598. return q.cons
  599. }
  600. // Same as Consistency but without a return value
  601. func (q *Query) SetConsistency(c Consistency) {
  602. q.cons = c
  603. }
  604. // Trace enables tracing of this query. Look at the documentation of the
  605. // Tracer interface to learn more about tracing.
  606. func (q *Query) Trace(trace Tracer) *Query {
  607. q.trace = trace
  608. return q
  609. }
  610. // Observer enables query-level observer on this query.
  611. // The provided observer will be called every time this query is executed.
  612. func (q *Query) Observer(observer QueryObserver) *Query {
  613. q.observer = observer
  614. return q
  615. }
  616. // PageSize will tell the iterator to fetch the result in pages of size n.
  617. // This is useful for iterating over large result sets, but setting the
  618. // page size too low might decrease the performance. This feature is only
  619. // available in Cassandra 2 and onwards.
  620. func (q *Query) PageSize(n int) *Query {
  621. q.pageSize = n
  622. return q
  623. }
  624. // DefaultTimestamp will enable the with default timestamp flag on the query.
  625. // If enable, this will replace the server side assigned
  626. // timestamp as default timestamp. Note that a timestamp in the query itself
  627. // will still override this timestamp. This is entirely optional.
  628. //
  629. // Only available on protocol >= 3
  630. func (q *Query) DefaultTimestamp(enable bool) *Query {
  631. q.defaultTimestamp = enable
  632. return q
  633. }
  634. // WithTimestamp will enable the with default timestamp flag on the query
  635. // like DefaultTimestamp does. But also allows to define value for timestamp.
  636. // It works the same way as USING TIMESTAMP in the query itself, but
  637. // should not break prepared query optimization
  638. //
  639. // Only available on protocol >= 3
  640. func (q *Query) WithTimestamp(timestamp int64) *Query {
  641. q.DefaultTimestamp(true)
  642. q.defaultTimestampValue = timestamp
  643. return q
  644. }
  645. // RoutingKey sets the routing key to use when a token aware connection
  646. // pool is used to optimize the routing of this query.
  647. func (q *Query) RoutingKey(routingKey []byte) *Query {
  648. q.routingKey = routingKey
  649. return q
  650. }
  651. // WithContext will set the context to use during a query, it will be used to
  652. // timeout when waiting for responses from Cassandra.
  653. func (q *Query) WithContext(ctx context.Context) *Query {
  654. q.context = ctx
  655. return q
  656. }
  657. func (q *Query) execute(conn *Conn) *Iter {
  658. return conn.executeQuery(q)
  659. }
  660. func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter) {
  661. if gocqlDebug {
  662. Logger.Printf("Attempting query: %d", q.attempts)
  663. }
  664. q.attempts++
  665. q.totalLatency += end.Sub(start).Nanoseconds()
  666. // TODO: track latencies per host and things as well instead of just total
  667. if q.observer != nil {
  668. q.observer.ObserveQuery(q.context, ObservedQuery{
  669. Keyspace: keyspace,
  670. Statement: q.stmt,
  671. Start: start,
  672. End: end,
  673. Rows: iter.numRows,
  674. Err: iter.err,
  675. })
  676. }
  677. }
  678. func (q *Query) retryPolicy() RetryPolicy {
  679. return q.rt
  680. }
  681. // Keyspace returns the keyspace the query will be executed against.
  682. func (q *Query) Keyspace() string {
  683. if q.session == nil {
  684. return ""
  685. }
  686. // TODO(chbannis): this should be parsed from the query or we should let
  687. // this be set by users.
  688. return q.session.cfg.Keyspace
  689. }
  690. // GetRoutingKey gets the routing key to use for routing this query. If
  691. // a routing key has not been explicitly set, then the routing key will
  692. // be constructed if possible using the keyspace's schema and the query
  693. // info for this query statement. If the routing key cannot be determined
  694. // then nil will be returned with no error. On any error condition,
  695. // an error description will be returned.
  696. func (q *Query) GetRoutingKey() ([]byte, error) {
  697. if q.routingKey != nil {
  698. return q.routingKey, nil
  699. } else if q.binding != nil && len(q.values) == 0 {
  700. // If this query was created using session.Bind we wont have the query
  701. // values yet, so we have to pass down to the next policy.
  702. // TODO: Remove this and handle this case
  703. return nil, nil
  704. }
  705. // try to determine the routing key
  706. routingKeyInfo, err := q.session.routingKeyInfo(q.context, q.stmt)
  707. if err != nil {
  708. return nil, err
  709. }
  710. if routingKeyInfo == nil {
  711. return nil, nil
  712. }
  713. if len(routingKeyInfo.indexes) == 1 {
  714. // single column routing key
  715. routingKey, err := Marshal(
  716. routingKeyInfo.types[0],
  717. q.values[routingKeyInfo.indexes[0]],
  718. )
  719. if err != nil {
  720. return nil, err
  721. }
  722. return routingKey, nil
  723. }
  724. // We allocate that buffer only once, so that further re-bind/exec of the
  725. // same query don't allocate more memory.
  726. if q.routingKeyBuffer == nil {
  727. q.routingKeyBuffer = make([]byte, 0, 256)
  728. }
  729. // composite routing key
  730. buf := bytes.NewBuffer(q.routingKeyBuffer)
  731. for i := range routingKeyInfo.indexes {
  732. encoded, err := Marshal(
  733. routingKeyInfo.types[i],
  734. q.values[routingKeyInfo.indexes[i]],
  735. )
  736. if err != nil {
  737. return nil, err
  738. }
  739. lenBuf := []byte{0x00, 0x00}
  740. binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded)))
  741. buf.Write(lenBuf)
  742. buf.Write(encoded)
  743. buf.WriteByte(0x00)
  744. }
  745. routingKey := buf.Bytes()
  746. return routingKey, nil
  747. }
  748. func (q *Query) shouldPrepare() bool {
  749. stmt := strings.TrimLeftFunc(strings.TrimRightFunc(q.stmt, func(r rune) bool {
  750. return unicode.IsSpace(r) || r == ';'
  751. }), unicode.IsSpace)
  752. var stmtType string
  753. if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 {
  754. stmtType = strings.ToLower(stmt[:n])
  755. }
  756. if stmtType == "begin" {
  757. if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 {
  758. stmtType = strings.ToLower(stmt[n+1:])
  759. }
  760. }
  761. switch stmtType {
  762. case "select", "insert", "update", "delete", "batch":
  763. return true
  764. }
  765. return false
  766. }
  767. // SetPrefetch sets the default threshold for pre-fetching new pages. If
  768. // there are only p*pageSize rows remaining, the next page will be requested
  769. // automatically.
  770. func (q *Query) Prefetch(p float64) *Query {
  771. q.prefetch = p
  772. return q
  773. }
  774. // RetryPolicy sets the policy to use when retrying the query.
  775. func (q *Query) RetryPolicy(r RetryPolicy) *Query {
  776. q.rt = r
  777. return q
  778. }
  779. func (q *Query) IsIdempotent() bool {
  780. return q.idempotent
  781. }
  782. // Idempontent marks the query as being idempontent or not depending on
  783. // the value.
  784. func (q *Query) Idempontent(value bool) *Query {
  785. q.idempotent = value
  786. return q
  787. }
  788. // Bind sets query arguments of query. This can also be used to rebind new query arguments
  789. // to an existing query instance.
  790. func (q *Query) Bind(v ...interface{}) *Query {
  791. q.values = v
  792. return q
  793. }
  794. // SerialConsistency sets the consistency level for the
  795. // serial phase of conditional updates. That consistency can only be
  796. // either SERIAL or LOCAL_SERIAL and if not present, it defaults to
  797. // SERIAL. This option will be ignored for anything else that a
  798. // conditional update/insert.
  799. func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
  800. q.serialCons = cons
  801. return q
  802. }
  803. // PageState sets the paging state for the query to resume paging from a specific
  804. // point in time. Setting this will disable to query paging for this query, and
  805. // must be used for all subsequent pages.
  806. func (q *Query) PageState(state []byte) *Query {
  807. q.pageState = state
  808. q.disableAutoPage = true
  809. return q
  810. }
  811. // NoSkipMetadata will override the internal result metadata cache so that the driver does not
  812. // send skip_metadata for queries, this means that the result will always contain
  813. // the metadata to parse the rows and will not reuse the metadata from the prepared
  814. // staement. This should only be used to work around cassandra bugs, such as when using
  815. // CAS operations which do not end in Cas.
  816. //
  817. // See https://issues.apache.org/jira/browse/CASSANDRA-11099
  818. // https://github.com/gocql/gocql/issues/612
  819. func (q *Query) NoSkipMetadata() *Query {
  820. q.disableSkipMetadata = true
  821. return q
  822. }
  823. // Exec executes the query without returning any rows.
  824. func (q *Query) Exec() error {
  825. return q.Iter().Close()
  826. }
  827. func isUseStatement(stmt string) bool {
  828. if len(stmt) < 3 {
  829. return false
  830. }
  831. return strings.EqualFold(stmt[0:3], "use")
  832. }
  833. // Iter executes the query and returns an iterator capable of iterating
  834. // over all results.
  835. func (q *Query) Iter() *Iter {
  836. if isUseStatement(q.stmt) {
  837. return &Iter{err: ErrUseStmt}
  838. }
  839. return q.session.executeQuery(q)
  840. }
  841. // MapScan executes the query, copies the columns of the first selected
  842. // row into the map pointed at by m and discards the rest. If no rows
  843. // were selected, ErrNotFound is returned.
  844. func (q *Query) MapScan(m map[string]interface{}) error {
  845. iter := q.Iter()
  846. if err := iter.checkErrAndNotFound(); err != nil {
  847. return err
  848. }
  849. iter.MapScan(m)
  850. return iter.Close()
  851. }
  852. // Scan executes the query, copies the columns of the first selected
  853. // row into the values pointed at by dest and discards the rest. If no rows
  854. // were selected, ErrNotFound is returned.
  855. func (q *Query) Scan(dest ...interface{}) error {
  856. iter := q.Iter()
  857. if err := iter.checkErrAndNotFound(); err != nil {
  858. return err
  859. }
  860. iter.Scan(dest...)
  861. return iter.Close()
  862. }
  863. // ScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
  864. // statement containing an IF clause). If the transaction fails because
  865. // the existing values did not match, the previous values will be stored
  866. // in dest.
  867. func (q *Query) ScanCAS(dest ...interface{}) (applied bool, err error) {
  868. q.disableSkipMetadata = true
  869. iter := q.Iter()
  870. if err := iter.checkErrAndNotFound(); err != nil {
  871. return false, err
  872. }
  873. if len(iter.Columns()) > 1 {
  874. dest = append([]interface{}{&applied}, dest...)
  875. iter.Scan(dest...)
  876. } else {
  877. iter.Scan(&applied)
  878. }
  879. return applied, iter.Close()
  880. }
  881. // MapScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
  882. // statement containing an IF clause). If the transaction fails because
  883. // the existing values did not match, the previous values will be stored
  884. // in dest map.
  885. //
  886. // As for INSERT .. IF NOT EXISTS, previous values will be returned as if
  887. // SELECT * FROM. So using ScanCAS with INSERT is inherently prone to
  888. // column mismatching. MapScanCAS is added to capture them safely.
  889. func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error) {
  890. q.disableSkipMetadata = true
  891. iter := q.Iter()
  892. if err := iter.checkErrAndNotFound(); err != nil {
  893. return false, err
  894. }
  895. iter.MapScan(dest)
  896. applied = dest["[applied]"].(bool)
  897. delete(dest, "[applied]")
  898. return applied, iter.Close()
  899. }
  900. // Release releases a query back into a pool of queries. Released Queries
  901. // cannot be reused.
  902. //
  903. // Example:
  904. // qry := session.Query("SELECT * FROM my_table")
  905. // qry.Exec()
  906. // qry.Release()
  907. func (q *Query) Release() {
  908. q.reset()
  909. queryPool.Put(q)
  910. }
  911. // reset zeroes out all fields of a query so that it can be safely pooled.
  912. func (q *Query) reset() {
  913. q.stmt = ""
  914. q.values = nil
  915. q.cons = 0
  916. q.pageSize = 0
  917. q.routingKey = nil
  918. q.routingKeyBuffer = nil
  919. q.pageState = nil
  920. q.prefetch = 0
  921. q.trace = nil
  922. q.session = nil
  923. q.rt = nil
  924. q.binding = nil
  925. q.attempts = 0
  926. q.totalLatency = 0
  927. q.serialCons = 0
  928. q.defaultTimestamp = false
  929. q.disableSkipMetadata = false
  930. q.disableAutoPage = false
  931. q.context = nil
  932. }
  933. // Iter represents an iterator that can be used to iterate over all rows that
  934. // were returned by a query. The iterator might send additional queries to the
  935. // database during the iteration if paging was enabled.
  936. type Iter struct {
  937. err error
  938. pos int
  939. meta resultMetadata
  940. numRows int
  941. next *nextIter
  942. host *HostInfo
  943. framer *framer
  944. closed int32
  945. }
  946. // Host returns the host which the query was sent to.
  947. func (iter *Iter) Host() *HostInfo {
  948. return iter.host
  949. }
  950. // Columns returns the name and type of the selected columns.
  951. func (iter *Iter) Columns() []ColumnInfo {
  952. return iter.meta.columns
  953. }
  954. type Scanner interface {
  955. // Next advances the row pointer to point at the next row, the row is valid until
  956. // the next call of Next. It returns true if there is a row which is available to be
  957. // scanned into with Scan.
  958. // Next must be called before every call to Scan.
  959. Next() bool
  960. // Scan copies the current row's columns into dest. If the length of dest does not equal
  961. // the number of columns returned in the row an error is returned. If an error is encountered
  962. // when unmarshalling a column into the value in dest an error is returned and the row is invalidated
  963. // until the next call to Next.
  964. // Next must be called before calling Scan, if it is not an error is returned.
  965. Scan(...interface{}) error
  966. // Err returns the if there was one during iteration that resulted in iteration being unable to complete.
  967. // Err will also release resources held by the iterator, the Scanner should not used after being called.
  968. Err() error
  969. }
  970. type iterScanner struct {
  971. iter *Iter
  972. cols [][]byte
  973. }
  974. func (is *iterScanner) Next() bool {
  975. iter := is.iter
  976. if iter.err != nil {
  977. return false
  978. }
  979. if iter.pos >= iter.numRows {
  980. if iter.next != nil {
  981. is.iter = iter.next.fetch()
  982. return is.Next()
  983. }
  984. return false
  985. }
  986. cols := make([][]byte, len(iter.meta.columns))
  987. for i := 0; i < len(cols); i++ {
  988. col, err := iter.readColumn()
  989. if err != nil {
  990. iter.err = err
  991. return false
  992. }
  993. cols[i] = col
  994. }
  995. is.cols = cols
  996. iter.pos++
  997. return true
  998. }
  999. func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) {
  1000. if dest[0] == nil {
  1001. return 1, nil
  1002. }
  1003. if col.TypeInfo.Type() == TypeTuple {
  1004. // this will panic, actually a bug, please report
  1005. tuple := col.TypeInfo.(TupleTypeInfo)
  1006. count := len(tuple.Elems)
  1007. // here we pass in a slice of the struct which has the number number of
  1008. // values as elements in the tuple
  1009. if err := Unmarshal(col.TypeInfo, p, dest[:count]); err != nil {
  1010. return 0, err
  1011. }
  1012. return count, nil
  1013. } else {
  1014. if err := Unmarshal(col.TypeInfo, p, dest[0]); err != nil {
  1015. return 0, err
  1016. }
  1017. return 1, nil
  1018. }
  1019. }
  1020. func (is *iterScanner) Scan(dest ...interface{}) error {
  1021. if is.cols == nil {
  1022. return errors.New("gocql: Scan called without calling Next")
  1023. }
  1024. iter := is.iter
  1025. // currently only support scanning into an expand tuple, such that its the same
  1026. // as scanning in more values from a single column
  1027. if len(dest) != iter.meta.actualColCount {
  1028. return fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
  1029. }
  1030. // i is the current position in dest, could posible replace it and just use
  1031. // slices of dest
  1032. i := 0
  1033. var err error
  1034. for _, col := range iter.meta.columns {
  1035. var n int
  1036. n, err = scanColumn(is.cols[i], col, dest[i:])
  1037. if err != nil {
  1038. break
  1039. }
  1040. i += n
  1041. }
  1042. is.cols = nil
  1043. return err
  1044. }
  1045. func (is *iterScanner) Err() error {
  1046. iter := is.iter
  1047. is.iter = nil
  1048. is.cols = nil
  1049. return iter.Close()
  1050. }
  1051. // Scanner returns a row Scanner which provides an interface to scan rows in a manner which is
  1052. // similar to database/sql. The iter should NOT be used again after calling this method.
  1053. func (iter *Iter) Scanner() Scanner {
  1054. if iter == nil {
  1055. return nil
  1056. }
  1057. return &iterScanner{iter: iter}
  1058. }
  1059. func (iter *Iter) readColumn() ([]byte, error) {
  1060. return iter.framer.readBytesInternal()
  1061. }
  1062. // Scan consumes the next row of the iterator and copies the columns of the
  1063. // current row into the values pointed at by dest. Use nil as a dest value
  1064. // to skip the corresponding column. Scan might send additional queries
  1065. // to the database to retrieve the next set of rows if paging was enabled.
  1066. //
  1067. // Scan returns true if the row was successfully unmarshaled or false if the
  1068. // end of the result set was reached or if an error occurred. Close should
  1069. // be called afterwards to retrieve any potential errors.
  1070. func (iter *Iter) Scan(dest ...interface{}) bool {
  1071. if iter.err != nil {
  1072. return false
  1073. }
  1074. if iter.pos >= iter.numRows {
  1075. if iter.next != nil {
  1076. *iter = *iter.next.fetch()
  1077. return iter.Scan(dest...)
  1078. }
  1079. return false
  1080. }
  1081. if iter.next != nil && iter.pos == iter.next.pos {
  1082. go iter.next.fetch()
  1083. }
  1084. // currently only support scanning into an expand tuple, such that its the same
  1085. // as scanning in more values from a single column
  1086. if len(dest) != iter.meta.actualColCount {
  1087. iter.err = fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
  1088. return false
  1089. }
  1090. // i is the current position in dest, could posible replace it and just use
  1091. // slices of dest
  1092. i := 0
  1093. for _, col := range iter.meta.columns {
  1094. colBytes, err := iter.readColumn()
  1095. if err != nil {
  1096. iter.err = err
  1097. return false
  1098. }
  1099. n, err := scanColumn(colBytes, col, dest[i:])
  1100. if err != nil {
  1101. iter.err = err
  1102. return false
  1103. }
  1104. i += n
  1105. }
  1106. iter.pos++
  1107. return true
  1108. }
  1109. // GetCustomPayload returns any parsed custom payload results if given in the
  1110. // response from Cassandra. Note that the result is not a copy.
  1111. //
  1112. // This additional feature of CQL Protocol v4
  1113. // allows additional results and query information to be returned by
  1114. // custom QueryHandlers running in your C* cluster.
  1115. // See https://datastax.github.io/java-driver/manual/custom_payloads/
  1116. func (iter *Iter) GetCustomPayload() map[string][]byte {
  1117. return iter.framer.header.customPayload
  1118. }
  1119. // Warnings returns any warnings generated if given in the response from Cassandra.
  1120. //
  1121. // This is only available starting with CQL Protocol v4.
  1122. func (iter *Iter) Warnings() []string {
  1123. if iter.framer != nil {
  1124. return iter.framer.header.warnings
  1125. }
  1126. return nil
  1127. }
  1128. // Close closes the iterator and returns any errors that happened during
  1129. // the query or the iteration.
  1130. func (iter *Iter) Close() error {
  1131. if atomic.CompareAndSwapInt32(&iter.closed, 0, 1) {
  1132. if iter.framer != nil {
  1133. framerPool.Put(iter.framer)
  1134. iter.framer = nil
  1135. }
  1136. }
  1137. return iter.err
  1138. }
  1139. // WillSwitchPage detects if iterator reached end of current page
  1140. // and the next page is available.
  1141. func (iter *Iter) WillSwitchPage() bool {
  1142. return iter.pos >= iter.numRows && iter.next != nil
  1143. }
  1144. // checkErrAndNotFound handle error and NotFound in one method.
  1145. func (iter *Iter) checkErrAndNotFound() error {
  1146. if iter.err != nil {
  1147. return iter.err
  1148. } else if iter.numRows == 0 {
  1149. return ErrNotFound
  1150. }
  1151. return nil
  1152. }
  1153. // PageState return the current paging state for a query which can be used for
  1154. // subsequent quries to resume paging this point.
  1155. func (iter *Iter) PageState() []byte {
  1156. return iter.meta.pagingState
  1157. }
  1158. // NumRows returns the number of rows in this pagination, it will update when new
  1159. // pages are fetched, it is not the value of the total number of rows this iter
  1160. // will return unless there is only a single page returned.
  1161. func (iter *Iter) NumRows() int {
  1162. return iter.numRows
  1163. }
  1164. type nextIter struct {
  1165. qry Query
  1166. pos int
  1167. once sync.Once
  1168. next *Iter
  1169. conn *Conn
  1170. }
  1171. func (n *nextIter) fetch() *Iter {
  1172. n.once.Do(func() {
  1173. iter := n.qry.session.executor.attemptQuery(&n.qry, n.conn)
  1174. if iter != nil && iter.err == nil {
  1175. n.next = iter
  1176. } else {
  1177. n.next = n.qry.session.executeQuery(&n.qry)
  1178. }
  1179. })
  1180. return n.next
  1181. }
  1182. type Batch struct {
  1183. Type BatchType
  1184. Entries []BatchEntry
  1185. Cons Consistency
  1186. rt RetryPolicy
  1187. observer BatchObserver
  1188. attempts int
  1189. totalLatency int64
  1190. serialCons SerialConsistency
  1191. defaultTimestamp bool
  1192. defaultTimestampValue int64
  1193. context context.Context
  1194. keyspace string
  1195. }
  1196. // NewBatch creates a new batch operation without defaults from the cluster
  1197. //
  1198. // Depreicated: use session.NewBatch instead
  1199. func NewBatch(typ BatchType) *Batch {
  1200. return &Batch{Type: typ}
  1201. }
  1202. // NewBatch creates a new batch operation using defaults defined in the cluster
  1203. func (s *Session) NewBatch(typ BatchType) *Batch {
  1204. s.mu.RLock()
  1205. batch := &Batch{
  1206. Type: typ,
  1207. rt: s.cfg.RetryPolicy,
  1208. serialCons: s.cfg.SerialConsistency,
  1209. observer: s.batchObserver,
  1210. Cons: s.cons,
  1211. defaultTimestamp: s.cfg.DefaultTimestamp,
  1212. keyspace: s.cfg.Keyspace,
  1213. }
  1214. s.mu.RUnlock()
  1215. return batch
  1216. }
  1217. // Observer enables batch-level observer on this batch.
  1218. // The provided observer will be called every time this batched query is executed.
  1219. func (b *Batch) Observer(observer BatchObserver) *Batch {
  1220. b.observer = observer
  1221. return b
  1222. }
  1223. func (b *Batch) Keyspace() string {
  1224. return b.keyspace
  1225. }
  1226. // Attempts returns the number of attempts made to execute the batch.
  1227. func (b *Batch) Attempts() int {
  1228. return b.attempts
  1229. }
  1230. //Latency returns the average number of nanoseconds to execute a single attempt of the batch.
  1231. func (b *Batch) Latency() int64 {
  1232. if b.attempts > 0 {
  1233. return b.totalLatency / int64(b.attempts)
  1234. }
  1235. return 0
  1236. }
  1237. // GetConsistency returns the currently configured consistency level for the batch
  1238. // operation.
  1239. func (b *Batch) GetConsistency() Consistency {
  1240. return b.Cons
  1241. }
  1242. // SetConsistency sets the currently configured consistency level for the batch
  1243. // operation.
  1244. func (b *Batch) SetConsistency(c Consistency) {
  1245. b.Cons = c
  1246. }
  1247. // Query adds the query to the batch operation
  1248. func (b *Batch) Query(stmt string, args ...interface{}) {
  1249. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
  1250. }
  1251. // Bind adds the query to the batch operation and correlates it with a binding callback
  1252. // that will be invoked when the batch is executed. The binding callback allows the application
  1253. // to define which query argument values will be marshalled as part of the batch execution.
  1254. func (b *Batch) Bind(stmt string, bind func(q *QueryInfo) ([]interface{}, error)) {
  1255. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, binding: bind})
  1256. }
  1257. func (b *Batch) retryPolicy() RetryPolicy {
  1258. return b.rt
  1259. }
  1260. // RetryPolicy sets the retry policy to use when executing the batch operation
  1261. func (b *Batch) RetryPolicy(r RetryPolicy) *Batch {
  1262. b.rt = r
  1263. return b
  1264. }
  1265. // WithContext will set the context to use during a query, it will be used to
  1266. // timeout when waiting for responses from Cassandra.
  1267. func (b *Batch) WithContext(ctx context.Context) *Batch {
  1268. b.context = ctx
  1269. return b
  1270. }
  1271. // Size returns the number of batch statements to be executed by the batch operation.
  1272. func (b *Batch) Size() int {
  1273. return len(b.Entries)
  1274. }
  1275. // SerialConsistency sets the consistency level for the
  1276. // serial phase of conditional updates. That consistency can only be
  1277. // either SERIAL or LOCAL_SERIAL and if not present, it defaults to
  1278. // SERIAL. This option will be ignored for anything else that a
  1279. // conditional update/insert.
  1280. //
  1281. // Only available for protocol 3 and above
  1282. func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
  1283. b.serialCons = cons
  1284. return b
  1285. }
  1286. // DefaultTimestamp will enable the with default timestamp flag on the query.
  1287. // If enable, this will replace the server side assigned
  1288. // timestamp as default timestamp. Note that a timestamp in the query itself
  1289. // will still override this timestamp. This is entirely optional.
  1290. //
  1291. // Only available on protocol >= 3
  1292. func (b *Batch) DefaultTimestamp(enable bool) *Batch {
  1293. b.defaultTimestamp = enable
  1294. return b
  1295. }
  1296. // WithTimestamp will enable the with default timestamp flag on the query
  1297. // like DefaultTimestamp does. But also allows to define value for timestamp.
  1298. // It works the same way as USING TIMESTAMP in the query itself, but
  1299. // should not break prepared query optimization
  1300. //
  1301. // Only available on protocol >= 3
  1302. func (b *Batch) WithTimestamp(timestamp int64) *Batch {
  1303. b.DefaultTimestamp(true)
  1304. b.defaultTimestampValue = timestamp
  1305. return b
  1306. }
  1307. func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter) {
  1308. b.attempts++
  1309. b.totalLatency += end.Sub(start).Nanoseconds()
  1310. // TODO: track latencies per host and things as well instead of just total
  1311. if b.observer == nil {
  1312. return
  1313. }
  1314. statements := make([]string, len(b.Entries))
  1315. for i, entry := range b.Entries {
  1316. statements[i] = entry.Stmt
  1317. }
  1318. b.observer.ObserveBatch(b.context, ObservedBatch{
  1319. Keyspace: keyspace,
  1320. Statements: statements,
  1321. Start: start,
  1322. End: end,
  1323. // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
  1324. Err: iter.err,
  1325. })
  1326. }
  1327. func (b *Batch) GetRoutingKey() ([]byte, error) {
  1328. // TODO: use the first statement in the batch as the routing key?
  1329. return nil, nil
  1330. }
  1331. type BatchType byte
  1332. const (
  1333. LoggedBatch BatchType = 0
  1334. UnloggedBatch BatchType = 1
  1335. CounterBatch BatchType = 2
  1336. )
  1337. type BatchEntry struct {
  1338. Stmt string
  1339. Args []interface{}
  1340. binding func(q *QueryInfo) ([]interface{}, error)
  1341. }
  1342. type ColumnInfo struct {
  1343. Keyspace string
  1344. Table string
  1345. Name string
  1346. TypeInfo TypeInfo
  1347. }
  1348. func (c ColumnInfo) String() string {
  1349. return fmt.Sprintf("[column keyspace=%s table=%s name=%s type=%v]", c.Keyspace, c.Table, c.Name, c.TypeInfo)
  1350. }
  1351. // routing key indexes LRU cache
  1352. type routingKeyInfoLRU struct {
  1353. lru *lru.Cache
  1354. mu sync.Mutex
  1355. }
  1356. type routingKeyInfo struct {
  1357. indexes []int
  1358. types []TypeInfo
  1359. }
  1360. func (r *routingKeyInfo) String() string {
  1361. return fmt.Sprintf("routing key index=%v types=%v", r.indexes, r.types)
  1362. }
  1363. func (r *routingKeyInfoLRU) Remove(key string) {
  1364. r.mu.Lock()
  1365. r.lru.Remove(key)
  1366. r.mu.Unlock()
  1367. }
  1368. //Max adjusts the maximum size of the cache and cleans up the oldest records if
  1369. //the new max is lower than the previous value. Not concurrency safe.
  1370. func (r *routingKeyInfoLRU) Max(max int) {
  1371. r.mu.Lock()
  1372. for r.lru.Len() > max {
  1373. r.lru.RemoveOldest()
  1374. }
  1375. r.lru.MaxEntries = max
  1376. r.mu.Unlock()
  1377. }
  1378. type inflightCachedEntry struct {
  1379. wg sync.WaitGroup
  1380. err error
  1381. value interface{}
  1382. }
  1383. // Tracer is the interface implemented by query tracers. Tracers have the
  1384. // ability to obtain a detailed event log of all events that happened during
  1385. // the execution of a query from Cassandra. Gathering this information might
  1386. // be essential for debugging and optimizing queries, but this feature should
  1387. // not be used on production systems with very high load.
  1388. type Tracer interface {
  1389. Trace(traceId []byte)
  1390. }
  1391. type traceWriter struct {
  1392. session *Session
  1393. w io.Writer
  1394. mu sync.Mutex
  1395. }
  1396. // NewTraceWriter returns a simple Tracer implementation that outputs
  1397. // the event log in a textual format.
  1398. func NewTraceWriter(session *Session, w io.Writer) Tracer {
  1399. return &traceWriter{session: session, w: w}
  1400. }
  1401. func (t *traceWriter) Trace(traceId []byte) {
  1402. var (
  1403. coordinator string
  1404. duration int
  1405. )
  1406. iter := t.session.control.query(`SELECT coordinator, duration
  1407. FROM system_traces.sessions
  1408. WHERE session_id = ?`, traceId)
  1409. iter.Scan(&coordinator, &duration)
  1410. if err := iter.Close(); err != nil {
  1411. t.mu.Lock()
  1412. fmt.Fprintln(t.w, "Error:", err)
  1413. t.mu.Unlock()
  1414. return
  1415. }
  1416. var (
  1417. timestamp time.Time
  1418. activity string
  1419. source string
  1420. elapsed int
  1421. )
  1422. fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n",
  1423. traceId, coordinator, time.Duration(duration)*time.Microsecond)
  1424. t.mu.Lock()
  1425. defer t.mu.Unlock()
  1426. iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed
  1427. FROM system_traces.events
  1428. WHERE session_id = ?`, traceId)
  1429. for iter.Scan(&timestamp, &activity, &source, &elapsed) {
  1430. fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n",
  1431. timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed)
  1432. }
  1433. if err := iter.Close(); err != nil {
  1434. fmt.Fprintln(t.w, "Error:", err)
  1435. }
  1436. }
  1437. type ObservedQuery struct {
  1438. Keyspace string
  1439. Statement string
  1440. Start time.Time // time immediately before the query was called
  1441. End time.Time // time immediately after the query returned
  1442. // Rows is the number of rows in the current iter.
  1443. // In paginated queries, rows from previous scans are not counted.
  1444. // Rows is not used in batch queries and remains at the default value
  1445. Rows int
  1446. // Err is the error in the query.
  1447. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
  1448. Err error
  1449. }
  1450. // QueryObserver is the interface implemented by query observers / stat collectors.
  1451. //
  1452. // Experimental, this interface and use may change
  1453. type QueryObserver interface {
  1454. // ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled.
  1455. // It doesn't get called if there is no query because the session is closed or there are no connections available.
  1456. // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
  1457. ObserveQuery(context.Context, ObservedQuery)
  1458. }
  1459. type ObservedBatch struct {
  1460. Keyspace string
  1461. Statements []string
  1462. Start time.Time // time immediately before the batch query was called
  1463. End time.Time // time immediately after the batch query returned
  1464. // Err is the error in the batch query.
  1465. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
  1466. Err error
  1467. }
  1468. // BatchObserver is the interface implemented by batch observers / stat collectors.
  1469. type BatchObserver interface {
  1470. // ObserveBatch gets called on every batch query to cassandra.
  1471. // It also gets called once for each query in a batch.
  1472. // It doesn't get called if there is no query because the session is closed or there are no connections available.
  1473. // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
  1474. // Unlike QueryObserver.ObserveQuery it does no reporting on rows read.
  1475. ObserveBatch(context.Context, ObservedBatch)
  1476. }
  1477. type Error struct {
  1478. Code int
  1479. Message string
  1480. }
  1481. func (e Error) Error() string {
  1482. return e.Message
  1483. }
  1484. var (
  1485. ErrNotFound = errors.New("not found")
  1486. ErrUnavailable = errors.New("unavailable")
  1487. ErrUnsupported = errors.New("feature not supported")
  1488. ErrTooManyStmts = errors.New("too many statements")
  1489. ErrUseStmt = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explanation.")
  1490. ErrSessionClosed = errors.New("session has been closed")
  1491. ErrNoConnections = errors.New("gocql: no hosts available in the pool")
  1492. ErrNoKeyspace = errors.New("no keyspace provided")
  1493. ErrKeyspaceDoesNotExist = errors.New("keyspace does not exist")
  1494. ErrNoMetadata = errors.New("no metadata available")
  1495. )
  1496. type ErrProtocol struct{ error }
  1497. func NewErrProtocol(format string, args ...interface{}) error {
  1498. return ErrProtocol{fmt.Errorf(format, args...)}
  1499. }
  1500. // BatchSizeMaximum is the maximum number of statements a batch operation can have.
  1501. // This limit is set by cassandra and could change in the future.
  1502. const BatchSizeMaximum = 65535