snapshot.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package migrate
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "log"
  9. "net/url"
  10. "os"
  11. "path"
  12. "sort"
  13. "strconv"
  14. "strings"
  15. "time"
  16. raftpb "github.com/coreos/etcd/raft/raftpb"
  17. )
  18. type Snapshot4 struct {
  19. State []byte `json:"state"`
  20. LastIndex uint64 `json:"lastIndex"`
  21. LastTerm uint64 `json:"lastTerm"`
  22. Peers []struct {
  23. Name string `json:"name"`
  24. ConnectionString string `json:"connectionString"`
  25. } `json:"peers"`
  26. }
  27. type sstore struct {
  28. Root *node
  29. CurrentIndex uint64
  30. CurrentVersion int
  31. }
  32. type node struct {
  33. Path string
  34. CreatedIndex uint64
  35. ModifiedIndex uint64
  36. Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  37. ExpireTime time.Time
  38. ACL string
  39. Value string // for key-value pair
  40. Children map[string]*node // for directory
  41. }
  42. func replacePathNames(n *node, s1, s2 string) {
  43. n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
  44. for _, c := range n.Children {
  45. replacePathNames(c, s1, s2)
  46. }
  47. }
  48. func pullNodesFromEtcd(n *node) map[string]uint64 {
  49. out := make(map[string]uint64)
  50. machines := n.Children["machines"]
  51. for name, c := range machines.Children {
  52. q, err := url.ParseQuery(c.Value)
  53. if err != nil {
  54. log.Fatal("Couldn't parse old query string value")
  55. }
  56. etcdurl := q.Get("etcd")
  57. rafturl := q.Get("raft")
  58. m := generateNodeMember(name, rafturl, etcdurl)
  59. out[m.Name] = uint64(m.ID)
  60. }
  61. return out
  62. }
  63. func fixEtcd(n *node) {
  64. n.Path = "/0"
  65. machines := n.Children["machines"]
  66. n.Children["members"] = &node{
  67. Path: "/0/members",
  68. CreatedIndex: machines.CreatedIndex,
  69. ModifiedIndex: machines.ModifiedIndex,
  70. ExpireTime: machines.ExpireTime,
  71. ACL: machines.ACL,
  72. Children: make(map[string]*node),
  73. }
  74. for name, c := range machines.Children {
  75. q, err := url.ParseQuery(c.Value)
  76. if err != nil {
  77. log.Fatal("Couldn't parse old query string value")
  78. }
  79. etcdurl := q.Get("etcd")
  80. rafturl := q.Get("raft")
  81. m := generateNodeMember(name, rafturl, etcdurl)
  82. attrBytes, err := json.Marshal(m.attributes)
  83. if err != nil {
  84. log.Fatal("Couldn't marshal attributes")
  85. }
  86. raftBytes, err := json.Marshal(m.raftAttributes)
  87. if err != nil {
  88. log.Fatal("Couldn't marshal raft attributes")
  89. }
  90. newNode := &node{
  91. Path: path.Join("/0/members", m.ID.String()),
  92. CreatedIndex: c.CreatedIndex,
  93. ModifiedIndex: c.ModifiedIndex,
  94. ExpireTime: c.ExpireTime,
  95. ACL: c.ACL,
  96. Children: map[string]*node{
  97. "attributes": &node{
  98. Path: path.Join("/0/members", m.ID.String(), "attributes"),
  99. CreatedIndex: c.CreatedIndex,
  100. ModifiedIndex: c.ModifiedIndex,
  101. ExpireTime: c.ExpireTime,
  102. ACL: c.ACL,
  103. Value: string(attrBytes),
  104. },
  105. "raftAttributes": &node{
  106. Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
  107. CreatedIndex: c.CreatedIndex,
  108. ModifiedIndex: c.ModifiedIndex,
  109. ExpireTime: c.ExpireTime,
  110. ACL: c.ACL,
  111. Value: string(raftBytes),
  112. },
  113. },
  114. }
  115. n.Children["members"].Children[m.ID.String()] = newNode
  116. }
  117. delete(n.Children, "machines")
  118. }
  119. func mangleRoot(n *node) *node {
  120. newRoot := &node{
  121. Path: "/",
  122. CreatedIndex: n.CreatedIndex,
  123. ModifiedIndex: n.ModifiedIndex,
  124. ExpireTime: n.ExpireTime,
  125. ACL: n.ACL,
  126. Children: make(map[string]*node),
  127. }
  128. newRoot.Children["1"] = n
  129. etcd := n.Children["_etcd"]
  130. delete(n.Children, "_etcd")
  131. replacePathNames(n, "/", "/1/")
  132. fixEtcd(etcd)
  133. newRoot.Children["0"] = etcd
  134. return newRoot
  135. }
  136. func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
  137. st := &sstore{}
  138. if err := json.Unmarshal(s.State, st); err != nil {
  139. log.Fatal("Couldn't unmarshal snapshot")
  140. }
  141. etcd := st.Root.Children["_etcd"]
  142. return pullNodesFromEtcd(etcd)
  143. }
  144. func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
  145. st := &sstore{}
  146. if err := json.Unmarshal(s.State, st); err != nil {
  147. log.Fatal("Couldn't unmarshal snapshot")
  148. }
  149. st.Root = mangleRoot(st.Root)
  150. newState, err := json.Marshal(st)
  151. if err != nil {
  152. log.Fatal("Couldn't re-marshal new snapshot")
  153. }
  154. nodes := s.GetNodesFromStore()
  155. nodeList := make([]uint64, 0)
  156. for _, v := range nodes {
  157. nodeList = append(nodeList, v)
  158. }
  159. snap5 := raftpb.Snapshot{
  160. Data: newState,
  161. Index: s.LastIndex,
  162. Term: s.LastTerm,
  163. Nodes: nodeList,
  164. }
  165. return &snap5
  166. }
  167. func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
  168. fname, err := FindLatestFile(snapdir)
  169. if err != nil {
  170. return nil, err
  171. }
  172. if fname == "" {
  173. return nil, nil
  174. }
  175. snappath := path.Join(snapdir, fname)
  176. log.Printf("Decoding snapshot from %s", snappath)
  177. return DecodeSnapshot4FromFile(snappath)
  178. }
  179. // FindLatestFile identifies the "latest" filename in a given directory
  180. // by sorting all the files and choosing the highest value.
  181. func FindLatestFile(dirpath string) (string, error) {
  182. dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
  183. if err != nil {
  184. if os.IsNotExist(err) {
  185. err = nil
  186. }
  187. return "", err
  188. }
  189. defer dir.Close()
  190. fnames, err := dir.Readdirnames(-1)
  191. if err != nil {
  192. return "", err
  193. }
  194. if len(fnames) == 0 {
  195. return "", nil
  196. }
  197. names, err := NewSnapshotFileNames(fnames)
  198. if err != nil {
  199. return "", err
  200. }
  201. return names[len(names)-1].FileName, nil
  202. }
  203. func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
  204. // Read snapshot data.
  205. f, err := os.OpenFile(path, os.O_RDONLY, 0)
  206. if err != nil {
  207. return nil, err
  208. }
  209. defer f.Close()
  210. return DecodeSnapshot4(f)
  211. }
  212. func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
  213. // Verify checksum
  214. var checksum uint32
  215. n, err := fmt.Fscanf(f, "%08x\n", &checksum)
  216. if err != nil {
  217. return nil, err
  218. } else if n != 1 {
  219. return nil, errors.New("miss heading checksum")
  220. }
  221. // Load remaining snapshot contents.
  222. b, err := ioutil.ReadAll(f)
  223. if err != nil {
  224. return nil, err
  225. }
  226. // Generate checksum.
  227. byteChecksum := crc32.ChecksumIEEE(b)
  228. if uint32(checksum) != byteChecksum {
  229. return nil, errors.New("bad checksum")
  230. }
  231. // Decode snapshot.
  232. snapshot := new(Snapshot4)
  233. if err = json.Unmarshal(b, snapshot); err != nil {
  234. return nil, err
  235. }
  236. return snapshot, nil
  237. }
  238. func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
  239. s := make([]SnapshotFileName, 0)
  240. for _, n := range names {
  241. trimmed := strings.TrimSuffix(n, ".ss")
  242. if trimmed == n {
  243. return nil, fmt.Errorf("file %q does not have .ss extension", n)
  244. }
  245. parts := strings.SplitN(trimmed, "_", 2)
  246. if len(parts) != 2 {
  247. return nil, fmt.Errorf("unrecognized file name format %q", n)
  248. }
  249. fn := SnapshotFileName{FileName: n}
  250. var err error
  251. fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
  252. if err != nil {
  253. return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err)
  254. }
  255. fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
  256. if err != nil {
  257. return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err)
  258. }
  259. s = append(s, fn)
  260. }
  261. sortable := SnapshotFileNames(s)
  262. sort.Sort(&sortable)
  263. return s, nil
  264. }
  265. type SnapshotFileNames []SnapshotFileName
  266. type SnapshotFileName struct {
  267. FileName string
  268. Term uint64
  269. Index uint64
  270. }
  271. func (n *SnapshotFileNames) Less(i, j int) bool {
  272. iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
  273. jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
  274. return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
  275. }
  276. func (n *SnapshotFileNames) Swap(i, j int) {
  277. (*n)[i], (*n)[j] = (*n)[j], (*n)[i]
  278. }
  279. func (n *SnapshotFileNames) Len() int {
  280. return len([]SnapshotFileName(*n))
  281. }