123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- package hash
- import (
- "fmt"
- "sort"
- "strconv"
- "sync"
- "github.com/tal-tech/go-zero/core/lang"
- "github.com/tal-tech/go-zero/core/mapping"
- )
- const (
- TopWeight = 100
- minReplicas = 100
- prime = 16777619
- )
- type (
- HashFunc func(data []byte) uint64
- ConsistentHash struct {
- hashFunc HashFunc
- replicas int
- keys []uint64
- ring map[uint64][]interface{}
- nodes map[string]lang.PlaceholderType
- lock sync.RWMutex
- }
- )
- func NewConsistentHash() *ConsistentHash {
- return NewCustomConsistentHash(minReplicas, Hash)
- }
- func NewCustomConsistentHash(replicas int, fn HashFunc) *ConsistentHash {
- if replicas < minReplicas {
- replicas = minReplicas
- }
- if fn == nil {
- fn = Hash
- }
- return &ConsistentHash{
- hashFunc: fn,
- replicas: replicas,
- ring: make(map[uint64][]interface{}),
- nodes: make(map[string]lang.PlaceholderType),
- }
- }
- // Add adds the node with the number of h.replicas,
- // the later call will overwrite the replicas of the former calls.
- func (h *ConsistentHash) Add(node interface{}) {
- h.AddWithReplicas(node, h.replicas)
- }
- // AddWithReplicas adds the node with the number of replicas,
- // replicas will be truncated to h.replicas if it's larger than h.replicas,
- // the later call will overwrite the replicas of the former calls.
- func (h *ConsistentHash) AddWithReplicas(node interface{}, replicas int) {
- h.Remove(node)
- if replicas > h.replicas {
- replicas = h.replicas
- }
- nodeRepr := repr(node)
- h.lock.Lock()
- defer h.lock.Unlock()
- h.addNode(nodeRepr)
- for i := 0; i < replicas; i++ {
- hash := h.hashFunc([]byte(nodeRepr + strconv.Itoa(i)))
- h.keys = append(h.keys, hash)
- h.ring[hash] = append(h.ring[hash], node)
- }
- sort.Slice(h.keys, func(i int, j int) bool {
- return h.keys[i] < h.keys[j]
- })
- }
- // AddWithWeight adds the node with weight, the weight can be 1 to 100, indicates the percent,
- // the later call will overwrite the replicas of the former calls.
- func (h *ConsistentHash) AddWithWeight(node interface{}, weight int) {
- // don't need to make sure weight not larger than TopWeight,
- // because AddWithReplicas makes sure replicas cannot be larger than h.replicas
- replicas := h.replicas * weight / TopWeight
- h.AddWithReplicas(node, replicas)
- }
- func (h *ConsistentHash) Get(v interface{}) (interface{}, bool) {
- h.lock.RLock()
- defer h.lock.RUnlock()
- if len(h.ring) == 0 {
- return nil, false
- }
- hash := h.hashFunc([]byte(repr(v)))
- index := sort.Search(len(h.keys), func(i int) bool {
- return h.keys[i] >= hash
- }) % len(h.keys)
- nodes := h.ring[h.keys[index]]
- switch len(nodes) {
- case 0:
- return nil, false
- case 1:
- return nodes[0], true
- default:
- innerIndex := h.hashFunc([]byte(innerRepr(v)))
- pos := int(innerIndex % uint64(len(nodes)))
- return nodes[pos], true
- }
- }
- func (h *ConsistentHash) Remove(node interface{}) {
- nodeRepr := repr(node)
- h.lock.Lock()
- defer h.lock.Unlock()
- if !h.containsNode(nodeRepr) {
- return
- }
- for i := 0; i < h.replicas; i++ {
- hash := h.hashFunc([]byte(nodeRepr + strconv.Itoa(i)))
- index := sort.Search(len(h.keys), func(i int) bool {
- return h.keys[i] >= hash
- })
- if index < len(h.keys) {
- h.keys = append(h.keys[:index], h.keys[index+1:]...)
- }
- h.removeRingNode(hash, nodeRepr)
- }
- h.removeNode(nodeRepr)
- }
- func (h *ConsistentHash) removeRingNode(hash uint64, nodeRepr string) {
- if nodes, ok := h.ring[hash]; ok {
- newNodes := nodes[:0]
- for _, x := range nodes {
- if repr(x) != nodeRepr {
- newNodes = append(newNodes, x)
- }
- }
- if len(newNodes) > 0 {
- h.ring[hash] = newNodes
- } else {
- delete(h.ring, hash)
- }
- }
- }
- func (h *ConsistentHash) addNode(nodeRepr string) {
- h.nodes[nodeRepr] = lang.Placeholder
- }
- func (h *ConsistentHash) containsNode(nodeRepr string) bool {
- _, ok := h.nodes[nodeRepr]
- return ok
- }
- func (h *ConsistentHash) removeNode(nodeRepr string) {
- delete(h.nodes, nodeRepr)
- }
- func innerRepr(node interface{}) string {
- return fmt.Sprintf("%d:%v", prime, node)
- }
- func repr(node interface{}) string {
- return mapping.Repr(node)
- }
|