token.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bytes"
  7. "crypto/md5"
  8. "fmt"
  9. "math/big"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "unsafe"
  14. )
  15. // a token partitioner
  16. type partitioner interface {
  17. Hash([]byte) token
  18. ParseString(string) token
  19. }
  20. // a token
  21. type token interface {
  22. fmt.Stringer
  23. Less(token) bool
  24. }
  25. // murmur3 partitioner and token
  26. type murmur3Partitioner struct{}
  27. type murmur3Token int64
  28. func (p murmur3Partitioner) Hash(partitionKey []byte) token {
  29. h1 := murmur3H1(partitionKey)
  30. return murmur3Token(int64(h1))
  31. }
  32. // murmur3 little-endian, 128-bit hash, but returns only h1
  33. func murmur3H1(data []byte) uint64 {
  34. length := len(data)
  35. var h1, h2, k1, k2 uint64
  36. const (
  37. c1 = 0x87c37b91114253d5
  38. c2 = 0x4cf5ad432745937f
  39. )
  40. // body
  41. nBlocks := length / 16
  42. for i := 0; i < nBlocks; i++ {
  43. block := (*[2]uint64)(unsafe.Pointer(&data[i*16]))
  44. k1 = block[0]
  45. k2 = block[1]
  46. k1 *= c1
  47. k1 = (k1 << 31) | (k1 >> 33) // ROTL64(k1, 31)
  48. k1 *= c2
  49. h1 ^= k1
  50. h1 = (h1 << 27) | (h1 >> 37) // ROTL64(h1, 27)
  51. h1 += h2
  52. h1 = h1*5 + 0x52dce729
  53. k2 *= c2
  54. k2 = (k2 << 33) | (k2 >> 31) // ROTL64(k2, 33)
  55. k2 *= c1
  56. h2 ^= k2
  57. h2 = (h2 << 31) | (h2 >> 33) // ROTL64(h2, 31)
  58. h2 += h1
  59. h2 = h2*5 + 0x38495ab5
  60. }
  61. // tail
  62. tail := data[nBlocks*16:]
  63. k1 = 0
  64. k2 = 0
  65. switch length & 15 {
  66. case 15:
  67. k2 ^= uint64(tail[14]) << 48
  68. fallthrough
  69. case 14:
  70. k2 ^= uint64(tail[13]) << 40
  71. fallthrough
  72. case 13:
  73. k2 ^= uint64(tail[12]) << 32
  74. fallthrough
  75. case 12:
  76. k2 ^= uint64(tail[11]) << 24
  77. fallthrough
  78. case 11:
  79. k2 ^= uint64(tail[10]) << 16
  80. fallthrough
  81. case 10:
  82. k2 ^= uint64(tail[9]) << 8
  83. fallthrough
  84. case 9:
  85. k2 ^= uint64(tail[8])
  86. k2 *= c2
  87. k2 = (k2 << 33) | (k2 >> 31) // ROTL64(k2, 33)
  88. k2 *= c1
  89. h2 ^= k2
  90. fallthrough
  91. case 8:
  92. k1 ^= uint64(tail[7]) << 56
  93. fallthrough
  94. case 7:
  95. k1 ^= uint64(tail[6]) << 48
  96. fallthrough
  97. case 6:
  98. k1 ^= uint64(tail[5]) << 40
  99. fallthrough
  100. case 5:
  101. k1 ^= uint64(tail[4]) << 32
  102. fallthrough
  103. case 4:
  104. k1 ^= uint64(tail[3]) << 24
  105. fallthrough
  106. case 3:
  107. k1 ^= uint64(tail[2]) << 16
  108. fallthrough
  109. case 2:
  110. k1 ^= uint64(tail[1]) << 8
  111. fallthrough
  112. case 1:
  113. k1 ^= uint64(tail[0])
  114. k1 *= c1
  115. k1 = (k1 << 31) | (k1 >> 33) // ROTL64(k1, 31)
  116. k1 *= c2
  117. h1 ^= k1
  118. }
  119. h1 ^= uint64(length)
  120. h2 ^= uint64(length)
  121. h1 += h2
  122. h2 += h1
  123. // finalizer
  124. const (
  125. fmix1 = 0xff51afd7ed558ccd
  126. fmix2 = 0xc4ceb9fe1a85ec53
  127. )
  128. // fmix64(h1)
  129. h1 ^= h1 >> 33
  130. h1 *= fmix1
  131. h1 ^= h1 >> 33
  132. h1 *= fmix2
  133. h1 ^= h1 >> 33
  134. // fmix64(h2)
  135. h2 ^= h2 >> 33
  136. h2 *= fmix1
  137. h2 ^= h2 >> 33
  138. h2 *= fmix2
  139. h2 ^= h2 >> 33
  140. h1 += h2
  141. // the following is extraneous since h2 is discarded
  142. // h2 += h1
  143. return h1
  144. }
  145. func (p murmur3Partitioner) ParseString(str string) token {
  146. val, _ := strconv.ParseInt(str, 10, 64)
  147. return murmur3Token(val)
  148. }
  149. func (m murmur3Token) String() string {
  150. return strconv.FormatInt(int64(m), 10)
  151. }
  152. func (m murmur3Token) Less(token token) bool {
  153. return m < token.(murmur3Token)
  154. }
  155. // order preserving partitioner and token
  156. type orderPreservingPartitioner struct{}
  157. type orderPreservingToken []byte
  158. func (p orderPreservingPartitioner) Hash(partitionKey []byte) token {
  159. // the partition key is the token
  160. return orderPreservingToken(partitionKey)
  161. }
  162. func (p orderPreservingPartitioner) ParseString(str string) token {
  163. return orderPreservingToken([]byte(str))
  164. }
  165. func (o orderPreservingToken) String() string {
  166. return string([]byte(o))
  167. }
  168. func (o orderPreservingToken) Less(token token) bool {
  169. return -1 == bytes.Compare(o, token.(orderPreservingToken))
  170. }
  171. // random partitioner and token
  172. type randomPartitioner struct{}
  173. type randomToken big.Int
  174. func (p randomPartitioner) Hash(partitionKey []byte) token {
  175. hash := md5.New()
  176. sum := hash.Sum(partitionKey)
  177. val := new(big.Int)
  178. val = val.SetBytes(sum)
  179. val = val.Abs(val)
  180. return (*randomToken)(val)
  181. }
  182. func (p randomPartitioner) ParseString(str string) token {
  183. val := new(big.Int)
  184. val.SetString(str, 10)
  185. return (*randomToken)(val)
  186. }
  187. func (r *randomToken) String() string {
  188. return (*big.Int)(r).String()
  189. }
  190. func (r *randomToken) Less(token token) bool {
  191. return -1 == (*big.Int)(r).Cmp((*big.Int)(token.(*randomToken)))
  192. }
  193. // a data structure for organizing the relationship between tokens and hosts
  194. type tokenRing struct {
  195. partitioner partitioner
  196. tokens []token
  197. hosts []*HostInfo
  198. }
  199. func newTokenRing(partitioner string, hosts []*HostInfo) (*tokenRing, error) {
  200. tokenRing := &tokenRing{
  201. tokens: []token{},
  202. hosts: []*HostInfo{},
  203. }
  204. if strings.HasSuffix(partitioner, "Murmur3Partitioner") {
  205. tokenRing.partitioner = murmur3Partitioner{}
  206. } else if strings.HasSuffix(partitioner, "OrderedPartitioner") {
  207. tokenRing.partitioner = orderPreservingPartitioner{}
  208. } else if strings.HasSuffix(partitioner, "RandomPartitioner") {
  209. tokenRing.partitioner = randomPartitioner{}
  210. } else {
  211. return nil, fmt.Errorf("Unsupported partitioner '%s'", partitioner)
  212. }
  213. for _, host := range hosts {
  214. for _, strToken := range host.Tokens {
  215. token := tokenRing.partitioner.ParseString(strToken)
  216. tokenRing.tokens = append(tokenRing.tokens, token)
  217. tokenRing.hosts = append(tokenRing.hosts, host)
  218. }
  219. }
  220. sort.Sort(tokenRing)
  221. return tokenRing, nil
  222. }
  223. func (t *tokenRing) Len() int {
  224. return len(t.tokens)
  225. }
  226. func (t *tokenRing) Less(i, j int) bool {
  227. return t.tokens[i].Less(t.tokens[j])
  228. }
  229. func (t *tokenRing) Swap(i, j int) {
  230. t.tokens[i], t.hosts[i], t.tokens[j], t.hosts[j] =
  231. t.tokens[j], t.hosts[j], t.tokens[i], t.hosts[i]
  232. }
  233. func (t *tokenRing) String() string {
  234. buf := &bytes.Buffer{}
  235. buf.WriteString("TokenRing={")
  236. sep := ""
  237. for i := range t.tokens {
  238. buf.WriteString(sep)
  239. sep = ","
  240. buf.WriteString("\n\t[")
  241. buf.WriteString(strconv.Itoa(i))
  242. buf.WriteString("]")
  243. buf.WriteString(t.tokens[i].String())
  244. buf.WriteString(":")
  245. buf.WriteString(t.hosts[i].Peer)
  246. }
  247. buf.WriteString("\n}")
  248. return string(buf.Bytes())
  249. }
  250. func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) *HostInfo {
  251. token := t.partitioner.Hash(partitionKey)
  252. return t.GetHostForToken(token)
  253. }
  254. func (t *tokenRing) GetHostForToken(token token) *HostInfo {
  255. // find the primary repica
  256. ringIndex := sort.Search(
  257. len(t.tokens),
  258. func(i int) bool {
  259. return !t.tokens[i].Less(token)
  260. },
  261. )
  262. if ringIndex == len(t.tokens) {
  263. // wrap around to the first in the ring
  264. ringIndex = 0
  265. }
  266. host := t.hosts[ringIndex]
  267. return host
  268. }