policies.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. //This file will be the future home for more policies
  5. package gocql
  6. import (
  7. "log"
  8. "sync"
  9. "sync/atomic"
  10. "github.com/hailocab/go-hostpool"
  11. )
  12. // cowHostList implements a copy on write host list, its equivilent type is []HostInfo
  13. type cowHostList struct {
  14. list atomic.Value
  15. mu sync.Mutex
  16. }
  17. func (c *cowHostList) get() []HostInfo {
  18. // TODO(zariel): should we replace this with []*HostInfo?
  19. l, ok := c.list.Load().(*[]HostInfo)
  20. if !ok {
  21. return nil
  22. }
  23. return *l
  24. }
  25. func (c *cowHostList) set(list []HostInfo) {
  26. c.mu.Lock()
  27. c.list.Store(&list)
  28. c.mu.Unlock()
  29. }
  30. func (c *cowHostList) add(host HostInfo) {
  31. c.mu.Lock()
  32. l := c.get()
  33. if n := len(l); n == 0 {
  34. l = []HostInfo{host}
  35. } else {
  36. newL := make([]HostInfo, n+1)
  37. for i := 0; i < n; i++ {
  38. if host.Peer == l[i].Peer && host.HostId == l[i].HostId {
  39. c.mu.Unlock()
  40. return
  41. }
  42. newL[i] = l[i]
  43. }
  44. newL[n] = host
  45. l = newL
  46. }
  47. c.list.Store(&l)
  48. c.mu.Unlock()
  49. }
  50. func (c *cowHostList) remove(addr string) {
  51. c.mu.Lock()
  52. l := c.get()
  53. size := len(l)
  54. if size == 0 {
  55. c.mu.Unlock()
  56. return
  57. }
  58. found := false
  59. newL := make([]HostInfo, 0, size)
  60. for i := 0; i < len(l); i++ {
  61. if l[i].Peer != addr {
  62. newL = append(newL, l[i])
  63. } else {
  64. found = true
  65. }
  66. }
  67. if !found {
  68. c.mu.Unlock()
  69. return
  70. }
  71. newL = newL[:size-1 : size-1]
  72. c.list.Store(&newL)
  73. c.mu.Unlock()
  74. }
  75. // RetryableQuery is an interface that represents a query or batch statement that
  76. // exposes the correct functions for the retry policy logic to evaluate correctly.
  77. type RetryableQuery interface {
  78. Attempts() int
  79. GetConsistency() Consistency
  80. }
  81. // RetryPolicy interface is used by gocql to determine if a query can be attempted
  82. // again after a retryable error has been received. The interface allows gocql
  83. // users to implement their own logic to determine if a query can be attempted
  84. // again.
  85. //
  86. // See SimpleRetryPolicy as an example of implementing and using a RetryPolicy
  87. // interface.
  88. type RetryPolicy interface {
  89. Attempt(RetryableQuery) bool
  90. }
  91. // SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
  92. //
  93. // See below for examples of usage:
  94. //
  95. // //Assign to the cluster
  96. // cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 3}
  97. //
  98. // //Assign to a query
  99. // query.RetryPolicy(&gocql.SimpleRetryPolicy{NumRetries: 1})
  100. //
  101. type SimpleRetryPolicy struct {
  102. NumRetries int //Number of times to retry a query
  103. }
  104. // Attempt tells gocql to attempt the query again based on query.Attempts being less
  105. // than the NumRetries defined in the policy.
  106. func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
  107. return q.Attempts() <= s.NumRetries
  108. }
  109. type HostStateNotifier interface {
  110. AddHost(host *HostInfo)
  111. RemoveHost(addr string)
  112. // TODO(zariel): add host up/down
  113. }
  114. // HostSelectionPolicy is an interface for selecting
  115. // the most appropriate host to execute a given query.
  116. type HostSelectionPolicy interface {
  117. HostStateNotifier
  118. SetHosts
  119. SetPartitioner
  120. //Pick returns an iteration function over selected hosts
  121. Pick(*Query) NextHost
  122. }
  123. // SelectedHost is an interface returned when picking a host from a host
  124. // selection policy.
  125. type SelectedHost interface {
  126. Info() *HostInfo
  127. Mark(error)
  128. }
  129. // NextHost is an iteration function over picked hosts
  130. type NextHost func() SelectedHost
  131. // RoundRobinHostPolicy is a round-robin load balancing policy, where each host
  132. // is tried sequentially for each query.
  133. func RoundRobinHostPolicy() HostSelectionPolicy {
  134. return &roundRobinHostPolicy{}
  135. }
  136. type roundRobinHostPolicy struct {
  137. hosts cowHostList
  138. pos uint32
  139. mu sync.RWMutex
  140. }
  141. func (r *roundRobinHostPolicy) SetHosts(hosts []HostInfo) {
  142. r.hosts.set(hosts)
  143. }
  144. func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {
  145. // noop
  146. }
  147. func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
  148. // i is used to limit the number of attempts to find a host
  149. // to the number of hosts known to this policy
  150. var i int
  151. return func() SelectedHost {
  152. hosts := r.hosts.get()
  153. if len(hosts) == 0 {
  154. return nil
  155. }
  156. // always increment pos to evenly distribute traffic in case of
  157. // failures
  158. pos := atomic.AddUint32(&r.pos, 1)
  159. if i >= len(hosts) {
  160. return nil
  161. }
  162. host := &r.hosts[(pos)%uint32(len(r.hosts))]
  163. i++
  164. return selectedRoundRobinHost{host}
  165. }
  166. }
  167. func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
  168. r.hosts.add(*host)
  169. }
  170. func (r *roundRobinHostPolicy) RemoveHost(addr string) {
  171. r.hosts.remove(addr)
  172. }
  173. // selectedRoundRobinHost is a host returned by the roundRobinHostPolicy and
  174. // implements the SelectedHost interface
  175. type selectedRoundRobinHost struct {
  176. info *HostInfo
  177. }
  178. func (host selectedRoundRobinHost) Info() *HostInfo {
  179. return host.info
  180. }
  181. func (host selectedRoundRobinHost) Mark(err error) {
  182. // noop
  183. }
  184. // TokenAwareHostPolicy is a token aware host selection policy, where hosts are
  185. // selected based on the partition key, so queries are sent to the host which
  186. // owns the partition. Fallback is used when routing information is not available.
  187. func TokenAwareHostPolicy(fallback HostSelectionPolicy) HostSelectionPolicy {
  188. return &tokenAwareHostPolicy{fallback: fallback}
  189. }
  190. type tokenAwareHostPolicy struct {
  191. hosts cowHostList
  192. mu sync.RWMutex
  193. partitioner string
  194. tokenRing *tokenRing
  195. fallback HostSelectionPolicy
  196. }
  197. func (t *tokenAwareHostPolicy) SetHosts(hosts []HostInfo) {
  198. t.hosts.set(hosts)
  199. t.mu.Lock()
  200. defer t.mu.Unlock()
  201. // always update the fallback
  202. t.fallback.SetHosts(hosts)
  203. t.resetTokenRing()
  204. }
  205. func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
  206. t.mu.Lock()
  207. defer t.mu.Unlock()
  208. if t.partitioner != partitioner {
  209. t.fallback.SetPartitioner(partitioner)
  210. t.partitioner = partitioner
  211. t.resetTokenRing()
  212. }
  213. }
  214. func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
  215. t.hosts.add(*host)
  216. t.mu.Lock()
  217. t.resetTokenRing()
  218. t.mu.Unlock()
  219. }
  220. func (t *tokenAwareHostPolicy) RemoveHost(addr string) {
  221. t.hosts.remove(addr)
  222. t.mu.Lock()
  223. t.resetTokenRing()
  224. t.mu.Unlock()
  225. }
  226. func (t *tokenAwareHostPolicy) resetTokenRing() {
  227. if t.partitioner == "" {
  228. // partitioner not yet set
  229. return
  230. }
  231. // create a new token ring
  232. hosts := t.hosts.get()
  233. tokenRing, err := newTokenRing(t.partitioner, hosts)
  234. if err != nil {
  235. log.Printf("Unable to update the token ring due to error: %s", err)
  236. return
  237. }
  238. // replace the token ring
  239. t.tokenRing = tokenRing
  240. }
  241. func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
  242. if qry == nil {
  243. return t.fallback.Pick(qry)
  244. } else if qry.binding != nil && len(qry.values) == 0 {
  245. // If this query was created using session.Bind we wont have the query
  246. // values yet, so we have to pass down to the next policy.
  247. // TODO: Remove this and handle this case
  248. return t.fallback.Pick(qry)
  249. }
  250. routingKey, err := qry.GetRoutingKey()
  251. if err != nil {
  252. return t.fallback.Pick(qry)
  253. }
  254. if routingKey == nil {
  255. return t.fallback.Pick(qry)
  256. }
  257. t.mu.RLock()
  258. // TODO retrieve a list of hosts based on the replication strategy
  259. host := t.tokenRing.GetHostForPartitionKey(routingKey)
  260. t.mu.RUnlock()
  261. if host == nil {
  262. return t.fallback.Pick(qry)
  263. }
  264. // scope these variables for the same lifetime as the iterator function
  265. var (
  266. hostReturned bool
  267. fallbackIter NextHost
  268. )
  269. return func() SelectedHost {
  270. if !hostReturned {
  271. hostReturned = true
  272. return selectedTokenAwareHost{host}
  273. }
  274. // fallback
  275. if fallbackIter == nil {
  276. fallbackIter = t.fallback.Pick(qry)
  277. }
  278. fallbackHost := fallbackIter()
  279. // filter the token aware selected hosts from the fallback hosts
  280. if fallbackHost.Info() == host {
  281. fallbackHost = fallbackIter()
  282. }
  283. return fallbackHost
  284. }
  285. }
  286. // selectedTokenAwareHost is a host returned by the tokenAwareHostPolicy and
  287. // implements the SelectedHost interface
  288. type selectedTokenAwareHost struct {
  289. info *HostInfo
  290. }
  291. func (host selectedTokenAwareHost) Info() *HostInfo {
  292. return host.info
  293. }
  294. func (host selectedTokenAwareHost) Mark(err error) {
  295. // noop
  296. }
  297. // HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
  298. // to distribute queries between hosts and prevent sending queries to
  299. // unresponsive hosts. When creating the host pool that is passed to the policy
  300. // use an empty slice of hosts as the hostpool will be populated later by gocql.
  301. // See below for examples of usage:
  302. //
  303. // // Create host selection policy using a simple host pool
  304. // cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
  305. //
  306. // // Create host selection policy using an epsilon greddy pool
  307. // cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
  308. // hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
  309. // )
  310. //
  311. func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy {
  312. return &hostPoolHostPolicy{hostMap: map[string]HostInfo{}, hp: hp}
  313. }
  314. type hostPoolHostPolicy struct {
  315. hp hostpool.HostPool
  316. mu sync.RWMutex
  317. hostMap map[string]HostInfo
  318. }
  319. func (r *hostPoolHostPolicy) SetHosts(hosts []HostInfo) {
  320. peers := make([]string, len(hosts))
  321. hostMap := make(map[string]HostInfo, len(hosts))
  322. for i, host := range hosts {
  323. peers[i] = host.Peer
  324. hostMap[host.Peer] = host
  325. }
  326. r.mu.Lock()
  327. r.hp.SetHosts(peers)
  328. r.hostMap = hostMap
  329. r.mu.Unlock()
  330. }
  331. func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
  332. r.mu.Lock()
  333. defer r.mu.Unlock()
  334. if _, ok := r.hostMap[host.Peer]; ok {
  335. return
  336. }
  337. hosts := make([]string, 0, len(r.hostMap)+1)
  338. for addr := range r.hostMap {
  339. hosts = append(hosts, addr)
  340. }
  341. hosts = append(hosts, host.Peer)
  342. r.hp.SetHosts(hosts)
  343. r.hostMap[host.Peer] = *host
  344. }
  345. func (r *hostPoolHostPolicy) RemoveHost(addr string) {
  346. r.mu.Unlock()
  347. defer r.mu.Unlock()
  348. if _, ok := r.hostMap[addr]; !ok {
  349. return
  350. }
  351. delete(r.hostMap, addr)
  352. hosts := make([]string, 0, len(r.hostMap))
  353. for addr := range r.hostMap {
  354. hosts = append(hosts, addr)
  355. }
  356. r.hp.SetHosts(hosts)
  357. }
  358. func (r *hostPoolHostPolicy) SetPartitioner(partitioner string) {
  359. // noop
  360. }
  361. func (r *hostPoolHostPolicy) Pick(qry *Query) NextHost {
  362. return func() SelectedHost {
  363. r.mu.RLock()
  364. defer r.mu.RUnlock()
  365. if len(r.hostMap) == 0 {
  366. return nil
  367. }
  368. hostR := r.hp.Get()
  369. host, ok := r.hostMap[hostR.Host()]
  370. if !ok {
  371. return nil
  372. }
  373. return selectedHostPoolHost{&host, hostR}
  374. }
  375. }
  376. // selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
  377. // implements the SelectedHost interface
  378. type selectedHostPoolHost struct {
  379. info *HostInfo
  380. hostR hostpool.HostPoolResponse
  381. }
  382. func (host selectedHostPoolHost) Info() *HostInfo {
  383. return host.info
  384. }
  385. func (host selectedHostPoolHost) Mark(err error) {
  386. host.hostR.Mark(err)
  387. }
  388. //ConnSelectionPolicy is an interface for selecting an
  389. //appropriate connection for executing a query
  390. type ConnSelectionPolicy interface {
  391. SetConns(conns []*Conn)
  392. Pick(*Query) *Conn
  393. }
  394. type roundRobinConnPolicy struct {
  395. conns []*Conn
  396. pos uint32
  397. mu sync.RWMutex
  398. }
  399. func RoundRobinConnPolicy() func() ConnSelectionPolicy {
  400. return func() ConnSelectionPolicy {
  401. return &roundRobinConnPolicy{}
  402. }
  403. }
  404. func (r *roundRobinConnPolicy) SetConns(conns []*Conn) {
  405. r.mu.Lock()
  406. r.conns = conns
  407. r.mu.Unlock()
  408. }
  409. func (r *roundRobinConnPolicy) Pick(qry *Query) *Conn {
  410. pos := int(atomic.AddUint32(&r.pos, 1) - 1)
  411. r.mu.RLock()
  412. defer r.mu.RUnlock()
  413. if len(r.conns) == 0 {
  414. return nil
  415. }
  416. for i := 0; i < len(r.conns); i++ {
  417. conn := r.conns[(pos+i)%len(r.conns)]
  418. if conn.AvailableStreams() > 0 {
  419. return conn
  420. }
  421. }
  422. return nil
  423. }