token.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bytes"
  7. "crypto/md5"
  8. "fmt"
  9. "math/big"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "unsafe"
  14. )
  15. // a token partitioner
  16. type Partitioner interface {
  17. Hash([]byte) Token
  18. ParseString(string) Token
  19. }
  20. // a token
  21. type Token interface {
  22. fmt.Stringer
  23. Less(Token) bool
  24. }
  25. // murmur3 partitioner and token
  26. type Murmur3Partitioner struct{}
  27. type Murmur3Token int64
  28. func (p Murmur3Partitioner) Hash(partitionKey []byte) Token {
  29. h1 := murmur3H1(partitionKey)
  30. return Murmur3Token(int64(h1))
  31. }
  32. // murmur3 little-endian, 128-bit hash, but returns only h1
  33. func murmur3H1(data []byte) uint64 {
  34. length := len(data)
  35. var h1, h2, k1, k2 uint64
  36. const c1 = 0x87c37b91114253d5
  37. const c2 = 0x4cf5ad432745937f
  38. // body
  39. nBlocks := length / 16
  40. for i := 0; i < nBlocks; i++ {
  41. block := (*[2]uint64)(unsafe.Pointer(&data[i*16]))
  42. k1 = block[0]
  43. k2 = block[1]
  44. k1 *= c1
  45. k1 = (k1 << 31) | (k1 >> 33) // ROTL64(k1, 31)
  46. k1 *= c2
  47. h1 ^= k1
  48. h1 = (h1 << 27) | (h1 >> 37) // ROTL64(h1, 27)
  49. h1 += h2
  50. h1 = h1*5 + 0x52dce729
  51. k2 *= c1
  52. k2 = (k2 << 33) | (k2 >> 31) // ROTL64(k2, 33)
  53. k2 *= c2
  54. h2 ^= k2
  55. h2 = (h2 << 33) | (h2 >> 31) // ROTL64(h2, 33)
  56. h2 += h1
  57. h2 = h2*5 + 0x38495ab5
  58. }
  59. // tail
  60. tail := data[nBlocks*16:]
  61. k1 = 0
  62. k2 = 0
  63. switch length & 15 {
  64. case 15:
  65. k2 ^= uint64(tail[14]) << 48
  66. fallthrough
  67. case 14:
  68. k2 ^= uint64(tail[13]) << 40
  69. fallthrough
  70. case 13:
  71. k2 ^= uint64(tail[12]) << 32
  72. fallthrough
  73. case 12:
  74. k2 ^= uint64(tail[11]) << 24
  75. fallthrough
  76. case 11:
  77. k2 ^= uint64(tail[10]) << 16
  78. fallthrough
  79. case 10:
  80. k2 ^= uint64(tail[9]) << 8
  81. fallthrough
  82. case 9:
  83. k2 ^= uint64(tail[8])
  84. k2 *= c2
  85. k2 = (k2 << 33) | (k2 >> 31) // ROTL64(k2, 33)
  86. k2 *= c1
  87. h2 ^= k2
  88. fallthrough
  89. case 8:
  90. k1 ^= uint64(tail[7]) << 56
  91. fallthrough
  92. case 7:
  93. k1 ^= uint64(tail[6]) << 48
  94. fallthrough
  95. case 6:
  96. k1 ^= uint64(tail[5]) << 40
  97. fallthrough
  98. case 5:
  99. k1 ^= uint64(tail[4]) << 32
  100. fallthrough
  101. case 4:
  102. k1 ^= uint64(tail[3]) << 24
  103. fallthrough
  104. case 3:
  105. k1 ^= uint64(tail[2]) << 16
  106. fallthrough
  107. case 2:
  108. k1 ^= uint64(tail[1]) << 8
  109. fallthrough
  110. case 1:
  111. k1 ^= uint64(tail[0])
  112. k1 *= c1
  113. k1 = (k1 << 31) | (k1 >> 33) // ROTL64(k1, 31)
  114. k1 *= c2
  115. h1 ^= k1
  116. }
  117. h1 ^= uint64(length)
  118. h2 ^= uint64(length)
  119. h1 += h2
  120. h2 += h1
  121. // finalizer
  122. const fmix1 = 0xff51afd7ed558ccd
  123. const fmix2 = 0xc4ceb9fe1a85ec53
  124. // fmix64(h1)
  125. h1 ^= h1 >> 33
  126. h1 *= fmix1
  127. h1 ^= h1 >> 33
  128. h1 *= fmix2
  129. h1 ^= h1 >> 33
  130. // fmix64(h2)
  131. h2 ^= h2 >> 33
  132. h2 *= fmix1
  133. h2 ^= h2 >> 33
  134. h2 *= fmix2
  135. h2 ^= h2 >> 33
  136. h1 += h2
  137. // the following is extraneous since h2 is discarded
  138. // h2 += h1
  139. return h1
  140. }
  141. func (p Murmur3Partitioner) ParseString(str string) Token {
  142. val, _ := strconv.ParseInt(str, 10, 64)
  143. return Murmur3Token(val)
  144. }
  145. func (m Murmur3Token) String() string {
  146. return strconv.FormatInt(int64(m), 10)
  147. }
  148. func (m Murmur3Token) Less(token Token) bool {
  149. return m < token.(Murmur3Token)
  150. }
  151. // order preserving partitioner and token
  152. type OrderPreservingPartitioner struct{}
  153. type OrderPreservingToken []byte
  154. func (p OrderPreservingPartitioner) Hash(partitionKey []byte) Token {
  155. // the partition key is the token
  156. return OrderPreservingToken(partitionKey)
  157. }
  158. func (p OrderPreservingPartitioner) ParseString(str string) Token {
  159. return OrderPreservingToken([]byte(str))
  160. }
  161. func (o OrderPreservingToken) String() string {
  162. return string([]byte(o))
  163. }
  164. func (o OrderPreservingToken) Less(token Token) bool {
  165. return -1 == bytes.Compare(o, token.(OrderPreservingToken))
  166. }
  167. // random partitioner and token
  168. type RandomPartitioner struct{}
  169. type RandomToken struct {
  170. *big.Int
  171. }
  172. func (p RandomPartitioner) Hash(partitionKey []byte) Token {
  173. hash := md5.New()
  174. sum := hash.Sum(partitionKey)
  175. val := new(big.Int)
  176. val = val.SetBytes(sum)
  177. val = val.Abs(val)
  178. return RandomToken{val}
  179. }
  180. func (p RandomPartitioner) ParseString(str string) Token {
  181. val := new(big.Int)
  182. val.SetString(str, 10)
  183. return RandomToken{val}
  184. }
  185. func (r RandomToken) Less(token Token) bool {
  186. return -1 == r.Int.Cmp(token.(RandomToken).Int)
  187. }
  188. // a data structure for organizing the relationship between tokens and hosts
  189. type TokenRing struct {
  190. partitioner Partitioner
  191. tokens []Token
  192. hosts []*HostInfo
  193. }
  194. func NewTokenRing(partitioner string, hosts []*HostInfo) (*TokenRing, error) {
  195. tokenRing := &TokenRing{
  196. tokens: []Token{},
  197. hosts: []*HostInfo{},
  198. }
  199. if strings.HasSuffix(partitioner, "Murmur3Partitioner") {
  200. tokenRing.partitioner = Murmur3Partitioner{}
  201. } else if strings.HasSuffix(partitioner, "OrderedPartitioner") {
  202. tokenRing.partitioner = OrderPreservingPartitioner{}
  203. } else if strings.HasSuffix(partitioner, "RandomPartitioner") {
  204. tokenRing.partitioner = RandomPartitioner{}
  205. } else {
  206. return nil, fmt.Errorf("Unsupported partitioner '%s'", partitioner)
  207. }
  208. for _, host := range hosts {
  209. for _, strToken := range host.Tokens {
  210. token := tokenRing.partitioner.ParseString(strToken)
  211. tokenRing.tokens = append(tokenRing.tokens, token)
  212. tokenRing.hosts = append(tokenRing.hosts, host)
  213. }
  214. }
  215. sort.Sort(tokenRing)
  216. return tokenRing, nil
  217. }
  218. func (t *TokenRing) Len() int {
  219. return len(t.tokens)
  220. }
  221. func (t *TokenRing) Less(i, j int) bool {
  222. return t.tokens[i].Less(t.tokens[j])
  223. }
  224. func (t *TokenRing) Swap(i, j int) {
  225. t.tokens[i], t.hosts[i], t.tokens[j], t.hosts[j] =
  226. t.tokens[j], t.hosts[j], t.tokens[i], t.hosts[i]
  227. }
  228. func (t *TokenRing) String() string {
  229. buf := &bytes.Buffer{}
  230. buf.WriteString("TokenRing={")
  231. sep := ""
  232. for i := range t.tokens {
  233. buf.WriteString(sep)
  234. sep = ","
  235. buf.WriteString("\n\t[")
  236. buf.WriteString(strconv.Itoa(i))
  237. buf.WriteString("]")
  238. buf.WriteString(t.tokens[i].String())
  239. buf.WriteString(":")
  240. buf.WriteString(t.hosts[i].Peer)
  241. }
  242. buf.WriteString("\n}")
  243. return string(buf.Bytes())
  244. }
  245. func (t *TokenRing) GetHostForPartitionKey(partitionKey []byte) *HostInfo {
  246. token := t.partitioner.Hash(partitionKey)
  247. return t.GetHostForToken(token)
  248. }
  249. func (t *TokenRing) GetHostForToken(token Token) *HostInfo {
  250. // find the primary repica
  251. ringIndex := sort.Search(
  252. len(t.tokens),
  253. func(i int) bool {
  254. return !t.tokens[i].Less(token)
  255. },
  256. )
  257. if ringIndex == len(t.tokens) {
  258. // wrap around to the first in the ring
  259. ringIndex = 0
  260. }
  261. host := t.hosts[ringIndex]
  262. return host
  263. }