snapshot.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package migrate
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "hash/crc32"
  20. "io/ioutil"
  21. "log"
  22. "net/url"
  23. "os"
  24. "path"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "time"
  29. raftpb "github.com/coreos/etcd/raft/raftpb"
  30. )
  31. type Snapshot4 struct {
  32. State []byte `json:"state"`
  33. LastIndex uint64 `json:"lastIndex"`
  34. LastTerm uint64 `json:"lastTerm"`
  35. Peers []struct {
  36. Name string `json:"name"`
  37. ConnectionString string `json:"connectionString"`
  38. } `json:"peers"`
  39. }
  40. type Store4 struct {
  41. Root *node
  42. CurrentIndex uint64
  43. CurrentVersion int
  44. }
  45. type node struct {
  46. Path string
  47. CreatedIndex uint64
  48. ModifiedIndex uint64
  49. Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  50. ExpireTime time.Time
  51. ACL string
  52. Value string // for key-value pair
  53. Children map[string]*node // for directory
  54. }
  55. func deepCopyNode(n *node, parent *node) *node {
  56. out := &node{
  57. Path: n.Path,
  58. CreatedIndex: n.CreatedIndex,
  59. ModifiedIndex: n.ModifiedIndex,
  60. Parent: parent,
  61. ExpireTime: n.ExpireTime,
  62. ACL: n.ACL,
  63. Value: n.Value,
  64. Children: make(map[string]*node),
  65. }
  66. for k, v := range n.Children {
  67. out.Children[k] = deepCopyNode(v, out)
  68. }
  69. return out
  70. }
  71. func replacePathNames(n *node, s1, s2 string) {
  72. n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
  73. for _, c := range n.Children {
  74. replacePathNames(c, s1, s2)
  75. }
  76. }
  77. func pullNodesFromEtcd(n *node) map[string]uint64 {
  78. out := make(map[string]uint64)
  79. machines := n.Children["machines"]
  80. for name, c := range machines.Children {
  81. q, err := url.ParseQuery(c.Value)
  82. if err != nil {
  83. log.Fatal("Couldn't parse old query string value")
  84. }
  85. etcdurl := q.Get("etcd")
  86. rafturl := q.Get("raft")
  87. m := generateNodeMember(name, rafturl, etcdurl)
  88. out[m.Name] = uint64(m.ID)
  89. }
  90. return out
  91. }
  92. func fixEtcd(etcdref *node) *node {
  93. n := &node{
  94. Path: "/0",
  95. CreatedIndex: etcdref.CreatedIndex,
  96. ModifiedIndex: etcdref.ModifiedIndex,
  97. ExpireTime: etcdref.ExpireTime,
  98. ACL: etcdref.ACL,
  99. Children: make(map[string]*node),
  100. }
  101. var machines *node
  102. if machineOrig, ok := etcdref.Children["machines"]; ok {
  103. machines = deepCopyNode(machineOrig, n)
  104. }
  105. if machines == nil {
  106. return n
  107. }
  108. n.Children["members"] = &node{
  109. Path: "/0/members",
  110. CreatedIndex: machines.CreatedIndex,
  111. ModifiedIndex: machines.ModifiedIndex,
  112. ExpireTime: machines.ExpireTime,
  113. ACL: machines.ACL,
  114. Children: make(map[string]*node),
  115. Parent: n,
  116. }
  117. for name, c := range machines.Children {
  118. q, err := url.ParseQuery(c.Value)
  119. if err != nil {
  120. log.Fatal("Couldn't parse old query string value")
  121. }
  122. etcdurl := q.Get("etcd")
  123. rafturl := q.Get("raft")
  124. m := generateNodeMember(name, rafturl, etcdurl)
  125. attrBytes, err := json.Marshal(m.attributes)
  126. if err != nil {
  127. log.Fatal("Couldn't marshal attributes")
  128. }
  129. raftBytes, err := json.Marshal(m.raftAttributes)
  130. if err != nil {
  131. log.Fatal("Couldn't marshal raft attributes")
  132. }
  133. newNode := &node{
  134. Path: path.Join("/0/members", m.ID.String()),
  135. CreatedIndex: c.CreatedIndex,
  136. ModifiedIndex: c.ModifiedIndex,
  137. ExpireTime: c.ExpireTime,
  138. ACL: c.ACL,
  139. Children: make(map[string]*node),
  140. Parent: n.Children["members"],
  141. }
  142. attrs := &node{
  143. Path: path.Join("/0/members", m.ID.String(), "attributes"),
  144. CreatedIndex: c.CreatedIndex,
  145. ModifiedIndex: c.ModifiedIndex,
  146. ExpireTime: c.ExpireTime,
  147. ACL: c.ACL,
  148. Value: string(attrBytes),
  149. Parent: newNode,
  150. }
  151. newNode.Children["attributes"] = attrs
  152. raftAttrs := &node{
  153. Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
  154. CreatedIndex: c.CreatedIndex,
  155. ModifiedIndex: c.ModifiedIndex,
  156. ExpireTime: c.ExpireTime,
  157. ACL: c.ACL,
  158. Value: string(raftBytes),
  159. Parent: newNode,
  160. }
  161. newNode.Children["raftAttributes"] = raftAttrs
  162. n.Children["members"].Children[m.ID.String()] = newNode
  163. }
  164. return n
  165. }
  166. func mangleRoot(n *node) *node {
  167. newRoot := &node{
  168. Path: "/",
  169. CreatedIndex: n.CreatedIndex,
  170. ModifiedIndex: n.ModifiedIndex,
  171. ExpireTime: n.ExpireTime,
  172. ACL: n.ACL,
  173. Children: make(map[string]*node),
  174. }
  175. newRoot.Children["1"] = n
  176. etcd := n.Children["_etcd"]
  177. replacePathNames(n, "/", "/1/")
  178. newZero := fixEtcd(etcd)
  179. newZero.Parent = newRoot
  180. newRoot.Children["0"] = newZero
  181. return newRoot
  182. }
  183. func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
  184. st := &Store4{}
  185. if err := json.Unmarshal(s.State, st); err != nil {
  186. log.Fatal("Couldn't unmarshal snapshot")
  187. }
  188. etcd := st.Root.Children["_etcd"]
  189. return pullNodesFromEtcd(etcd)
  190. }
  191. func (s *Snapshot4) Snapshot2() *raftpb.Snapshot {
  192. st := &Store4{}
  193. if err := json.Unmarshal(s.State, st); err != nil {
  194. log.Fatal("Couldn't unmarshal snapshot")
  195. }
  196. st.Root = mangleRoot(st.Root)
  197. newState, err := json.Marshal(st)
  198. if err != nil {
  199. log.Fatal("Couldn't re-marshal new snapshot")
  200. }
  201. nodes := s.GetNodesFromStore()
  202. nodeList := make([]uint64, 0)
  203. for _, v := range nodes {
  204. nodeList = append(nodeList, v)
  205. }
  206. snap2 := raftpb.Snapshot{
  207. Data: newState,
  208. Metadata: raftpb.SnapshotMetadata{
  209. Index: s.LastIndex,
  210. Term: s.LastTerm + termOffset4to2,
  211. ConfState: raftpb.ConfState{
  212. Nodes: nodeList,
  213. },
  214. },
  215. }
  216. return &snap2
  217. }
  218. func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
  219. fname, err := FindLatestFile(snapdir)
  220. if err != nil {
  221. return nil, err
  222. }
  223. if fname == "" {
  224. return nil, nil
  225. }
  226. snappath := path.Join(snapdir, fname)
  227. log.Printf("Decoding snapshot from %s", snappath)
  228. return DecodeSnapshot4FromFile(snappath)
  229. }
  230. // FindLatestFile identifies the "latest" filename in a given directory
  231. // by sorting all the files and choosing the highest value.
  232. func FindLatestFile(dirpath string) (string, error) {
  233. dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
  234. if err != nil {
  235. if os.IsNotExist(err) {
  236. err = nil
  237. }
  238. return "", err
  239. }
  240. defer dir.Close()
  241. fnames, err := dir.Readdirnames(-1)
  242. if err != nil {
  243. return "", err
  244. }
  245. if len(fnames) == 0 {
  246. return "", nil
  247. }
  248. names, err := NewSnapshotFileNames(fnames)
  249. if err != nil {
  250. return "", err
  251. }
  252. return names[len(names)-1].FileName, nil
  253. }
  254. func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
  255. // Read snapshot data.
  256. f, err := os.OpenFile(path, os.O_RDONLY, 0)
  257. if err != nil {
  258. return nil, err
  259. }
  260. defer f.Close()
  261. return DecodeSnapshot4(f)
  262. }
  263. func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
  264. // Verify checksum
  265. var checksum uint32
  266. n, err := fmt.Fscanf(f, "%08x\n", &checksum)
  267. if err != nil {
  268. return nil, err
  269. } else if n != 1 {
  270. return nil, errors.New("miss heading checksum")
  271. }
  272. // Load remaining snapshot contents.
  273. b, err := ioutil.ReadAll(f)
  274. if err != nil {
  275. return nil, err
  276. }
  277. // Generate checksum.
  278. byteChecksum := crc32.ChecksumIEEE(b)
  279. if uint32(checksum) != byteChecksum {
  280. return nil, errors.New("bad checksum")
  281. }
  282. // Decode snapshot.
  283. snapshot := new(Snapshot4)
  284. if err = json.Unmarshal(b, snapshot); err != nil {
  285. return nil, err
  286. }
  287. return snapshot, nil
  288. }
  289. func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
  290. s := make([]SnapshotFileName, 0)
  291. for _, n := range names {
  292. trimmed := strings.TrimSuffix(n, ".ss")
  293. if trimmed == n {
  294. return nil, fmt.Errorf("file %q does not have .ss extension", n)
  295. }
  296. parts := strings.SplitN(trimmed, "_", 2)
  297. if len(parts) != 2 {
  298. return nil, fmt.Errorf("unrecognized file name format %q", n)
  299. }
  300. fn := SnapshotFileName{FileName: n}
  301. var err error
  302. fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
  303. if err != nil {
  304. return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err)
  305. }
  306. fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
  307. if err != nil {
  308. return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err)
  309. }
  310. s = append(s, fn)
  311. }
  312. sortable := SnapshotFileNames(s)
  313. sort.Sort(&sortable)
  314. return s, nil
  315. }
  316. type SnapshotFileNames []SnapshotFileName
  317. type SnapshotFileName struct {
  318. FileName string
  319. Term uint64
  320. Index uint64
  321. }
  322. func (n *SnapshotFileNames) Less(i, j int) bool {
  323. iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
  324. jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
  325. return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
  326. }
  327. func (n *SnapshotFileNames) Swap(i, j int) {
  328. (*n)[i], (*n)[j] = (*n)[j], (*n)[i]
  329. }
  330. func (n *SnapshotFileNames) Len() int {
  331. return len([]SnapshotFileName(*n))
  332. }