util.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "os/signal"
  11. "runtime/pprof"
  12. "strconv"
  13. "time"
  14. etcdErr "github.com/coreos/etcd/error"
  15. "github.com/coreos/etcd/store"
  16. "github.com/coreos/go-log/log"
  17. "github.com/coreos/go-raft"
  18. )
  19. //--------------------------------------
  20. // etcd http Helper
  21. //--------------------------------------
  22. // Convert string duration to time format
  23. func durationToExpireTime(strDuration string) (time.Time, error) {
  24. if strDuration != "" {
  25. duration, err := strconv.Atoi(strDuration)
  26. if err != nil {
  27. return store.Permanent, err
  28. }
  29. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  30. } else {
  31. return store.Permanent, nil
  32. }
  33. }
  34. //--------------------------------------
  35. // HTTP Utilities
  36. //--------------------------------------
  37. func (r *raftServer) dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
  38. if r.State() == raft.Leader {
  39. if response, err := r.Do(c); err != nil {
  40. return err
  41. } else {
  42. if response == nil {
  43. return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
  44. }
  45. event, ok := response.(*store.Event)
  46. if ok {
  47. bytes, err := json.Marshal(event)
  48. if err != nil {
  49. fmt.Println(err)
  50. }
  51. w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
  52. w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
  53. w.WriteHeader(http.StatusOK)
  54. w.Write(bytes)
  55. return nil
  56. }
  57. bytes, _ := response.([]byte)
  58. w.WriteHeader(http.StatusOK)
  59. w.Write(bytes)
  60. return nil
  61. }
  62. } else {
  63. leader := r.Leader()
  64. // current no leader
  65. if leader == "" {
  66. return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
  67. }
  68. url, _ := toURL(leader)
  69. redirect(url, w, req)
  70. return nil
  71. }
  72. }
  73. func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
  74. path := req.URL.Path
  75. url := hostname + path
  76. debugf("Redirect to %s", url)
  77. http.Redirect(w, req, url, http.StatusTemporaryRedirect)
  78. }
  79. func decodeJsonRequest(req *http.Request, data interface{}) error {
  80. decoder := json.NewDecoder(req.Body)
  81. if err := decoder.Decode(&data); err != nil && err != io.EOF {
  82. warnf("Malformed json request: %v", err)
  83. return fmt.Errorf("Malformed json request: %v", err)
  84. }
  85. return nil
  86. }
  87. func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
  88. w.Header().Set("Content-Type", "application/json")
  89. w.WriteHeader(status)
  90. if data != nil {
  91. encoder := json.NewEncoder(w)
  92. encoder.Encode(data)
  93. }
  94. }
  95. // sanitizeURL will cleanup a host string in the format hostname:port and
  96. // attach a schema.
  97. func sanitizeURL(host string, defaultScheme string) string {
  98. // Blank URLs are fine input, just return it
  99. if len(host) == 0 {
  100. return host
  101. }
  102. p, err := url.Parse(host)
  103. if err != nil {
  104. fatal(err)
  105. }
  106. // Make sure the host is in Host:Port format
  107. _, _, err = net.SplitHostPort(host)
  108. if err != nil {
  109. fatal(err)
  110. }
  111. p = &url.URL{Host: host, Scheme: defaultScheme}
  112. return p.String()
  113. }
  114. // sanitizeListenHost cleans up the ListenHost parameter and appends a port
  115. // if necessary based on the advertised port.
  116. func sanitizeListenHost(listen string, advertised string) string {
  117. aurl, err := url.Parse(advertised)
  118. if err != nil {
  119. fatal(err)
  120. }
  121. ahost, aport, err := net.SplitHostPort(aurl.Host)
  122. if err != nil {
  123. fatal(err)
  124. }
  125. // If the listen host isn't set use the advertised host
  126. if listen == "" {
  127. listen = ahost
  128. }
  129. return net.JoinHostPort(listen, aport)
  130. }
  131. func check(err error) {
  132. if err != nil {
  133. fatal(err)
  134. }
  135. }
  136. func getNodePath(urlPath string) string {
  137. pathPrefixLen := len("/" + version + "/keys")
  138. return urlPath[pathPrefixLen:]
  139. }
  140. //--------------------------------------
  141. // Log
  142. //--------------------------------------
  143. var logger *log.Logger = log.New("etcd", false,
  144. log.CombinedSink(os.Stdout, "[%s] %s %-9s | %s\n", []string{"prefix", "time", "priority", "message"}))
  145. func infof(format string, v ...interface{}) {
  146. logger.Infof(format, v...)
  147. }
  148. func debugf(format string, v ...interface{}) {
  149. if verbose {
  150. logger.Debugf(format, v...)
  151. }
  152. }
  153. func debug(v ...interface{}) {
  154. if verbose {
  155. logger.Debug(v...)
  156. }
  157. }
  158. func warnf(format string, v ...interface{}) {
  159. logger.Warningf(format, v...)
  160. }
  161. func warn(v ...interface{}) {
  162. logger.Warning(v...)
  163. }
  164. func fatalf(format string, v ...interface{}) {
  165. logger.Fatalf(format, v...)
  166. }
  167. func fatal(v ...interface{}) {
  168. logger.Fatalln(v...)
  169. }
  170. //--------------------------------------
  171. // CPU profile
  172. //--------------------------------------
  173. func runCPUProfile() {
  174. f, err := os.Create(cpuprofile)
  175. if err != nil {
  176. fatal(err)
  177. }
  178. pprof.StartCPUProfile(f)
  179. c := make(chan os.Signal, 1)
  180. signal.Notify(c, os.Interrupt)
  181. go func() {
  182. for sig := range c {
  183. infof("captured %v, stopping profiler and exiting..", sig)
  184. pprof.StopCPUProfile()
  185. os.Exit(1)
  186. }
  187. }()
  188. }
  189. //--------------------------------------
  190. // Testing
  191. //--------------------------------------
  192. func directSet() {
  193. c := make(chan bool, 1000)
  194. for i := 0; i < 1000; i++ {
  195. go send(c)
  196. }
  197. for i := 0; i < 1000; i++ {
  198. <-c
  199. }
  200. }
  201. func send(c chan bool) {
  202. for i := 0; i < 10; i++ {
  203. command := &UpdateCommand{}
  204. command.Key = "foo"
  205. command.Value = "bar"
  206. command.ExpireTime = time.Unix(0, 0)
  207. //r.Do(command)
  208. }
  209. c <- true
  210. }