snapshot.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package migrate
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "log"
  9. "net/url"
  10. "os"
  11. "path"
  12. "sort"
  13. "strconv"
  14. "strings"
  15. "time"
  16. raftpb "github.com/coreos/etcd/raft/raftpb"
  17. )
  18. type Snapshot4 struct {
  19. State []byte `json:"state"`
  20. LastIndex uint64 `json:"lastIndex"`
  21. LastTerm uint64 `json:"lastTerm"`
  22. Peers []struct {
  23. Name string `json:"name"`
  24. ConnectionString string `json:"connectionString"`
  25. } `json:"peers"`
  26. }
  27. type sstore struct {
  28. Root *node
  29. CurrentIndex uint64
  30. CurrentVersion int
  31. }
  32. type node struct {
  33. Path string
  34. CreatedIndex uint64
  35. ModifiedIndex uint64
  36. Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  37. ExpireTime time.Time
  38. ACL string
  39. Value string // for key-value pair
  40. Children map[string]*node // for directory
  41. }
  42. func replacePathNames(n *node, s1, s2 string) {
  43. n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
  44. for _, c := range n.Children {
  45. replacePathNames(c, s1, s2)
  46. }
  47. }
  48. func pullNodesFromEtcd(n *node) map[string]uint64 {
  49. out := make(map[string]uint64)
  50. machines := n.Children["machines"]
  51. for name, c := range machines.Children {
  52. q, err := url.ParseQuery(c.Value)
  53. if err != nil {
  54. log.Fatal("Couldn't parse old query string value")
  55. }
  56. etcdurl := q.Get("etcd")
  57. rafturl := q.Get("raft")
  58. m := generateNodeMember(name, rafturl, etcdurl)
  59. out[m.Name] = uint64(m.ID)
  60. }
  61. return out
  62. }
  63. func fixEtcd(n *node) {
  64. n.Path = "/0"
  65. machines := n.Children["machines"]
  66. n.Children["members"] = &node{
  67. Path: "/0/members",
  68. CreatedIndex: machines.CreatedIndex,
  69. ModifiedIndex: machines.ModifiedIndex,
  70. ExpireTime: machines.ExpireTime,
  71. ACL: machines.ACL,
  72. Children: make(map[string]*node),
  73. }
  74. for name, c := range machines.Children {
  75. q, err := url.ParseQuery(c.Value)
  76. if err != nil {
  77. log.Fatal("Couldn't parse old query string value")
  78. }
  79. etcdurl := q.Get("etcd")
  80. rafturl := q.Get("raft")
  81. m := generateNodeMember(name, rafturl, etcdurl)
  82. attrBytes, err := json.Marshal(m.attributes)
  83. if err != nil {
  84. log.Fatal("Couldn't marshal attributes")
  85. }
  86. raftBytes, err := json.Marshal(m.raftAttributes)
  87. if err != nil {
  88. log.Fatal("Couldn't marshal raft attributes")
  89. }
  90. newNode := &node{
  91. Path: path.Join("/0/members", m.ID.String()),
  92. CreatedIndex: c.CreatedIndex,
  93. ModifiedIndex: c.ModifiedIndex,
  94. ExpireTime: c.ExpireTime,
  95. ACL: c.ACL,
  96. Children: map[string]*node{
  97. "attributes": &node{
  98. Path: path.Join("/0/members", m.ID.String(), "attributes"),
  99. CreatedIndex: c.CreatedIndex,
  100. ModifiedIndex: c.ModifiedIndex,
  101. ExpireTime: c.ExpireTime,
  102. ACL: c.ACL,
  103. Value: string(attrBytes),
  104. },
  105. "raftAttributes": &node{
  106. Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
  107. CreatedIndex: c.CreatedIndex,
  108. ModifiedIndex: c.ModifiedIndex,
  109. ExpireTime: c.ExpireTime,
  110. ACL: c.ACL,
  111. Value: string(raftBytes),
  112. },
  113. },
  114. }
  115. n.Children["members"].Children[m.ID.String()] = newNode
  116. }
  117. delete(n.Children, "machines")
  118. }
  119. func mangleRoot(n *node) *node {
  120. newRoot := &node{
  121. Path: "/",
  122. CreatedIndex: n.CreatedIndex,
  123. ModifiedIndex: n.ModifiedIndex,
  124. ExpireTime: n.ExpireTime,
  125. ACL: n.ACL,
  126. Children: make(map[string]*node),
  127. }
  128. newRoot.Children["1"] = n
  129. etcd := n.Children["_etcd"]
  130. delete(n.Children, "_etcd")
  131. replacePathNames(n, "/", "/1/")
  132. fixEtcd(etcd)
  133. newRoot.Children["0"] = etcd
  134. return newRoot
  135. }
  136. func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
  137. st := &sstore{}
  138. if err := json.Unmarshal(s.State, st); err != nil {
  139. log.Fatal("Couldn't unmarshal snapshot")
  140. }
  141. etcd := st.Root.Children["_etcd"]
  142. return pullNodesFromEtcd(etcd)
  143. }
  144. func (s *Snapshot4) Snapshot2() *raftpb.Snapshot {
  145. st := &sstore{}
  146. if err := json.Unmarshal(s.State, st); err != nil {
  147. log.Fatal("Couldn't unmarshal snapshot")
  148. }
  149. st.Root = mangleRoot(st.Root)
  150. newState, err := json.Marshal(st)
  151. if err != nil {
  152. log.Fatal("Couldn't re-marshal new snapshot")
  153. }
  154. nodes := s.GetNodesFromStore()
  155. nodeList := make([]uint64, 0)
  156. for _, v := range nodes {
  157. nodeList = append(nodeList, v)
  158. }
  159. snap2 := raftpb.Snapshot{
  160. Data: newState,
  161. Metadata: raftpb.SnapshotMetadata{
  162. Index: s.LastIndex,
  163. Term: s.LastTerm,
  164. ConfState: raftpb.ConfState{
  165. Nodes: nodeList,
  166. },
  167. },
  168. }
  169. return &snap2
  170. }
  171. func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
  172. fname, err := FindLatestFile(snapdir)
  173. if err != nil {
  174. return nil, err
  175. }
  176. if fname == "" {
  177. return nil, nil
  178. }
  179. snappath := path.Join(snapdir, fname)
  180. log.Printf("Decoding snapshot from %s", snappath)
  181. return DecodeSnapshot4FromFile(snappath)
  182. }
  183. // FindLatestFile identifies the "latest" filename in a given directory
  184. // by sorting all the files and choosing the highest value.
  185. func FindLatestFile(dirpath string) (string, error) {
  186. dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
  187. if err != nil {
  188. if os.IsNotExist(err) {
  189. err = nil
  190. }
  191. return "", err
  192. }
  193. defer dir.Close()
  194. fnames, err := dir.Readdirnames(-1)
  195. if err != nil {
  196. return "", err
  197. }
  198. if len(fnames) == 0 {
  199. return "", nil
  200. }
  201. names, err := NewSnapshotFileNames(fnames)
  202. if err != nil {
  203. return "", err
  204. }
  205. return names[len(names)-1].FileName, nil
  206. }
  207. func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
  208. // Read snapshot data.
  209. f, err := os.OpenFile(path, os.O_RDONLY, 0)
  210. if err != nil {
  211. return nil, err
  212. }
  213. defer f.Close()
  214. return DecodeSnapshot4(f)
  215. }
  216. func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
  217. // Verify checksum
  218. var checksum uint32
  219. n, err := fmt.Fscanf(f, "%08x\n", &checksum)
  220. if err != nil {
  221. return nil, err
  222. } else if n != 1 {
  223. return nil, errors.New("miss heading checksum")
  224. }
  225. // Load remaining snapshot contents.
  226. b, err := ioutil.ReadAll(f)
  227. if err != nil {
  228. return nil, err
  229. }
  230. // Generate checksum.
  231. byteChecksum := crc32.ChecksumIEEE(b)
  232. if uint32(checksum) != byteChecksum {
  233. return nil, errors.New("bad checksum")
  234. }
  235. // Decode snapshot.
  236. snapshot := new(Snapshot4)
  237. if err = json.Unmarshal(b, snapshot); err != nil {
  238. return nil, err
  239. }
  240. return snapshot, nil
  241. }
  242. func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
  243. s := make([]SnapshotFileName, 0)
  244. for _, n := range names {
  245. trimmed := strings.TrimSuffix(n, ".ss")
  246. if trimmed == n {
  247. return nil, fmt.Errorf("file %q does not have .ss extension", n)
  248. }
  249. parts := strings.SplitN(trimmed, "_", 2)
  250. if len(parts) != 2 {
  251. return nil, fmt.Errorf("unrecognized file name format %q", n)
  252. }
  253. fn := SnapshotFileName{FileName: n}
  254. var err error
  255. fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
  256. if err != nil {
  257. return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err)
  258. }
  259. fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
  260. if err != nil {
  261. return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err)
  262. }
  263. s = append(s, fn)
  264. }
  265. sortable := SnapshotFileNames(s)
  266. sort.Sort(&sortable)
  267. return s, nil
  268. }
  269. type SnapshotFileNames []SnapshotFileName
  270. type SnapshotFileName struct {
  271. FileName string
  272. Term uint64
  273. Index uint64
  274. }
  275. func (n *SnapshotFileNames) Less(i, j int) bool {
  276. iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
  277. jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
  278. return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
  279. }
  280. func (n *SnapshotFileNames) Swap(i, j int) {
  281. (*n)[i], (*n)[j] = (*n)[j], (*n)[i]
  282. }
  283. func (n *SnapshotFileNames) Len() int {
  284. return len([]SnapshotFileName(*n))
  285. }