log.go 11 KB


  1. package migrate
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "path"
  10. "time"
  11. "github.com/coreos/etcd/etcdserver"
  12. etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  13. etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
  14. "github.com/coreos/etcd/pkg/types"
  15. raftpb "github.com/coreos/etcd/raft/raftpb"
  16. "github.com/coreos/etcd/store"
  17. )
  18. const etcdDefaultClusterName = "etcd-cluster"
  19. func UnixTimeOrPermanent(expireTime time.Time) int64 {
  20. expire := expireTime.Unix()
  21. if expireTime == store.Permanent {
  22. expire = 0
  23. }
  24. return expire
  25. }
  26. type Log4 []*etcd4pb.LogEntry
  27. func (l Log4) NodeIDs() map[string]uint64 {
  28. out := make(map[string]uint64)
  29. for _, e := range l {
  30. if e.GetCommandName() == "etcd:join" {
  31. cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
  32. if err != nil {
  33. log.Println("error converting an etcd:join to v0.5 format. Likely corrupt!")
  34. return nil
  35. }
  36. join := cmd4.(*JoinCommand)
  37. m := generateNodeMember(join.Name, join.RaftURL, "")
  38. out[join.Name] = uint64(m.ID)
  39. }
  40. if e.GetCommandName() == "etcd:remove" {
  41. cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
  42. if err != nil {
  43. return nil
  44. }
  45. name := cmd4.(*RemoveCommand).Name
  46. delete(out, name)
  47. }
  48. }
  49. return out
  50. }
  51. func StorePath(key string) string {
  52. return path.Join(etcdserver.StoreKeysPrefix, key)
  53. }
  54. func DecodeLog4FromFile(logpath string) (Log4, error) {
  55. file, err := os.OpenFile(logpath, os.O_RDONLY, 0600)
  56. if err != nil {
  57. return nil, err
  58. }
  59. defer file.Close()
  60. return DecodeLog4(file)
  61. }
  62. func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) {
  63. var readBytes int64
  64. entries := make([]*etcd4pb.LogEntry, 0)
  65. for {
  66. entry, n, err := DecodeNextEntry4(file)
  67. if err != nil {
  68. if err == io.EOF {
  69. break
  70. }
  71. return nil, fmt.Errorf("failed decoding next log entry: %v", err)
  72. }
  73. entries = append(entries, entry)
  74. readBytes += int64(n)
  75. }
  76. return entries, nil
  77. }
  78. // DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the
  79. // number of bytes read and any error that occurs.
  80. func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) {
  81. var length int
  82. _, err := fmt.Fscanf(r, "%8x\n", &length)
  83. if err != nil {
  84. return nil, -1, err
  85. }
  86. data := make([]byte, length)
  87. if _, err = io.ReadFull(r, data); err != nil {
  88. return nil, -1, err
  89. }
  90. ent4 := new(etcd4pb.LogEntry)
  91. if err = ent4.Unmarshal(data); err != nil {
  92. return nil, -1, err
  93. }
  94. // add width of scanner token to length
  95. length = length + 8 + 1
  96. return ent4, length, nil
  97. }
  98. func hashName(name string) uint64 {
  99. var sum uint64
  100. for _, ch := range name {
  101. sum = 131*sum + uint64(ch)
  102. }
  103. return sum
  104. }
  105. type Command4 interface {
  106. Type5() raftpb.EntryType
  107. Data5() ([]byte, error)
  108. }
  109. func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) {
  110. var cmd Command4
  111. switch name {
  112. case "etcd:remove":
  113. cmd = &RemoveCommand{}
  114. case "etcd:join":
  115. cmd = &JoinCommand{}
  116. case "etcd:setClusterConfig":
  117. cmd = &NOPCommand{}
  118. case "etcd:compareAndDelete":
  119. cmd = &CompareAndDeleteCommand{}
  120. case "etcd:compareAndSwap":
  121. cmd = &CompareAndSwapCommand{}
  122. case "etcd:create":
  123. cmd = &CreateCommand{}
  124. case "etcd:delete":
  125. cmd = &DeleteCommand{}
  126. case "etcd:set":
  127. cmd = &SetCommand{}
  128. case "etcd:sync":
  129. cmd = &SyncCommand{}
  130. case "etcd:update":
  131. cmd = &UpdateCommand{}
  132. case "raft:join":
  133. // These are subsumed by etcd:remove and etcd:join; we shouldn't see them.
  134. fallthrough
  135. case "raft:leave":
  136. return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log")
  137. case "raft:nop":
  138. cmd = &NOPCommand{}
  139. default:
  140. return nil, fmt.Errorf("unregistered command type %s", name)
  141. }
  142. // If data for the command was passed in the decode it.
  143. if data != nil {
  144. if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil {
  145. return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err)
  146. }
  147. }
  148. switch name {
  149. case "etcd:join":
  150. c := cmd.(*JoinCommand)
  151. m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL)
  152. c.memb = *m
  153. if raftMap != nil {
  154. raftMap[c.Name] = uint64(m.ID)
  155. }
  156. case "etcd:remove":
  157. c := cmd.(*RemoveCommand)
  158. if raftMap != nil {
  159. m, ok := raftMap[c.Name]
  160. if !ok {
  161. return nil, fmt.Errorf("removing a node named %s before it joined", c.Name)
  162. }
  163. c.id = m
  164. delete(raftMap, c.Name)
  165. }
  166. }
  167. return cmd, nil
  168. }
  169. type RemoveCommand struct {
  170. Name string `json:"name"`
  171. id uint64
  172. }
  173. func (c *RemoveCommand) Type5() raftpb.EntryType {
  174. return raftpb.EntryConfChange
  175. }
  176. func (c *RemoveCommand) Data5() ([]byte, error) {
  177. req5 := raftpb.ConfChange{
  178. ID: 0,
  179. Type: raftpb.ConfChangeRemoveNode,
  180. NodeID: c.id,
  181. }
  182. return req5.Marshal()
  183. }
  184. type JoinCommand struct {
  185. Name string `json:"name"`
  186. RaftURL string `json:"raftURL"`
  187. EtcdURL string `json:"etcdURL"`
  188. memb etcdserver.Member
  189. }
  190. func (c *JoinCommand) Type5() raftpb.EntryType {
  191. return raftpb.EntryConfChange
  192. }
  193. func (c *JoinCommand) Data5() ([]byte, error) {
  194. b, err := json.Marshal(c.memb)
  195. if err != nil {
  196. return nil, err
  197. }
  198. req5 := &raftpb.ConfChange{
  199. ID: 0,
  200. Type: raftpb.ConfChangeAddNode,
  201. NodeID: uint64(c.memb.ID),
  202. Context: b,
  203. }
  204. return req5.Marshal()
  205. }
  206. type SetClusterConfigCommand struct {
  207. Config *struct {
  208. ActiveSize int `json:"activeSize"`
  209. RemoveDelay float64 `json:"removeDelay"`
  210. SyncInterval float64 `json:"syncInterval"`
  211. } `json:"config"`
  212. }
  213. func (c *SetClusterConfigCommand) Type5() raftpb.EntryType {
  214. return raftpb.EntryNormal
  215. }
  216. func (c *SetClusterConfigCommand) Data5() ([]byte, error) {
  217. b, err := json.Marshal(c.Config)
  218. if err != nil {
  219. return nil, err
  220. }
  221. req5 := &etcdserverpb.Request{
  222. Method: "PUT",
  223. Path: "/v2/admin/config",
  224. Dir: false,
  225. Val: string(b),
  226. }
  227. return req5.Marshal()
  228. }
  229. type CompareAndDeleteCommand struct {
  230. Key string `json:"key"`
  231. PrevValue string `json:"prevValue"`
  232. PrevIndex uint64 `json:"prevIndex"`
  233. }
  234. func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType {
  235. return raftpb.EntryNormal
  236. }
  237. func (c *CompareAndDeleteCommand) Data5() ([]byte, error) {
  238. req5 := &etcdserverpb.Request{
  239. Method: "DELETE",
  240. Path: StorePath(c.Key),
  241. PrevValue: c.PrevValue,
  242. PrevIndex: c.PrevIndex,
  243. }
  244. return req5.Marshal()
  245. }
  246. type CompareAndSwapCommand struct {
  247. Key string `json:"key"`
  248. Value string `json:"value"`
  249. ExpireTime time.Time `json:"expireTime"`
  250. PrevValue string `json:"prevValue"`
  251. PrevIndex uint64 `json:"prevIndex"`
  252. }
  253. func (c *CompareAndSwapCommand) Type5() raftpb.EntryType {
  254. return raftpb.EntryNormal
  255. }
  256. func (c *CompareAndSwapCommand) Data5() ([]byte, error) {
  257. req5 := &etcdserverpb.Request{
  258. Method: "PUT",
  259. Path: StorePath(c.Key),
  260. Val: c.Value,
  261. PrevValue: c.PrevValue,
  262. PrevIndex: c.PrevIndex,
  263. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  264. }
  265. return req5.Marshal()
  266. }
  267. type CreateCommand struct {
  268. Key string `json:"key"`
  269. Value string `json:"value"`
  270. ExpireTime time.Time `json:"expireTime"`
  271. Unique bool `json:"unique"`
  272. Dir bool `json:"dir"`
  273. }
  274. func (c *CreateCommand) Type5() raftpb.EntryType {
  275. return raftpb.EntryNormal
  276. }
  277. func (c *CreateCommand) Data5() ([]byte, error) {
  278. req5 := &etcdserverpb.Request{
  279. Path: StorePath(c.Key),
  280. Dir: c.Dir,
  281. Val: c.Value,
  282. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  283. }
  284. if c.Unique {
  285. req5.Method = "POST"
  286. } else {
  287. var prevExist = true
  288. req5.Method = "PUT"
  289. req5.PrevExist = &prevExist
  290. }
  291. return req5.Marshal()
  292. }
  293. type DeleteCommand struct {
  294. Key string `json:"key"`
  295. Recursive bool `json:"recursive"`
  296. Dir bool `json:"dir"`
  297. }
  298. func (c *DeleteCommand) Type5() raftpb.EntryType {
  299. return raftpb.EntryNormal
  300. }
  301. func (c *DeleteCommand) Data5() ([]byte, error) {
  302. req5 := &etcdserverpb.Request{
  303. Method: "DELETE",
  304. Path: StorePath(c.Key),
  305. Dir: c.Dir,
  306. Recursive: c.Recursive,
  307. }
  308. return req5.Marshal()
  309. }
  310. type SetCommand struct {
  311. Key string `json:"key"`
  312. Value string `json:"value"`
  313. ExpireTime time.Time `json:"expireTime"`
  314. Dir bool `json:"dir"`
  315. }
  316. func (c *SetCommand) Type5() raftpb.EntryType {
  317. return raftpb.EntryNormal
  318. }
  319. func (c *SetCommand) Data5() ([]byte, error) {
  320. req5 := &etcdserverpb.Request{
  321. Method: "PUT",
  322. Path: StorePath(c.Key),
  323. Dir: c.Dir,
  324. Val: c.Value,
  325. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  326. }
  327. return req5.Marshal()
  328. }
  329. type UpdateCommand struct {
  330. Key string `json:"key"`
  331. Value string `json:"value"`
  332. ExpireTime time.Time `json:"expireTime"`
  333. }
  334. func (c *UpdateCommand) Type5() raftpb.EntryType {
  335. return raftpb.EntryNormal
  336. }
  337. func (c *UpdateCommand) Data5() ([]byte, error) {
  338. exist := true
  339. req5 := &etcdserverpb.Request{
  340. Method: "PUT",
  341. Path: StorePath(c.Key),
  342. Val: c.Value,
  343. PrevExist: &exist,
  344. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  345. }
  346. return req5.Marshal()
  347. }
  348. type SyncCommand struct {
  349. Time time.Time `json:"time"`
  350. }
  351. func (c *SyncCommand) Type5() raftpb.EntryType {
  352. return raftpb.EntryNormal
  353. }
  354. func (c *SyncCommand) Data5() ([]byte, error) {
  355. req5 := &etcdserverpb.Request{
  356. Method: "SYNC",
  357. Time: c.Time.UnixNano(),
  358. }
  359. return req5.Marshal()
  360. }
  361. type DefaultJoinCommand struct {
  362. Name string `json:"name"`
  363. ConnectionString string `json:"connectionString"`
  364. }
  365. type DefaultLeaveCommand struct {
  366. Name string `json:"name"`
  367. id uint64
  368. }
  369. type NOPCommand struct{}
  370. //TODO(bcwaldon): Why is CommandName here?
  371. func (c NOPCommand) CommandName() string {
  372. return "raft:nop"
  373. }
  374. func (c *NOPCommand) Type5() raftpb.EntryType {
  375. return raftpb.EntryNormal
  376. }
  377. func (c *NOPCommand) Data5() ([]byte, error) {
  378. return nil, nil
  379. }
  380. func Entries4To5(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
  381. ents4Len := len(ents4)
  382. if ents4Len == 0 {
  383. return nil, nil
  384. }
  385. startIndex := ents4[0].GetIndex()
  386. for i, e := range ents4[1:] {
  387. eIndex := e.GetIndex()
  388. // ensure indexes are monotonically increasing
  389. wantIndex := startIndex + uint64(i+1)
  390. if wantIndex != eIndex {
  391. return nil, fmt.Errorf("skipped log index %d", wantIndex)
  392. }
  393. }
  394. raftMap := make(map[string]uint64)
  395. ents5 := make([]raftpb.Entry, 0)
  396. for i, e := range ents4 {
  397. ent, err := toEntry5(e, raftMap)
  398. if err != nil {
  399. log.Fatalf("Error converting entry %d, %s", i, err)
  400. } else {
  401. ents5 = append(ents5, *ent)
  402. }
  403. }
  404. return ents5, nil
  405. }
  406. func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
  407. cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap)
  408. if err != nil {
  409. return nil, err
  410. }
  411. data, err := cmd4.Data5()
  412. if err != nil {
  413. return nil, err
  414. }
  415. ent5 := raftpb.Entry{
  416. Term: ent4.GetTerm(),
  417. Index: ent4.GetIndex(),
  418. Type: cmd4.Type5(),
  419. Data: data,
  420. }
  421. log.Printf("%d: %s -> %s", ent5.Index, ent4.GetCommandName(), ent5.Type)
  422. return &ent5, nil
  423. }
  424. func generateNodeMember(name, rafturl, etcdurl string) *etcdserver.Member {
  425. pURLs, err := types.NewURLs([]string{rafturl})
  426. if err != nil {
  427. log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
  428. }
  429. m := etcdserver.NewMember(name, pURLs, etcdDefaultClusterName, nil)
  430. m.ClientURLs = []string{etcdurl}
  431. return m
  432. }