util.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "os/signal"
  11. "runtime/pprof"
  12. "strconv"
  13. "time"
  14. etcdErr "github.com/coreos/etcd/error"
  15. "github.com/coreos/etcd/store"
  16. "github.com/coreos/etcd/web"
  17. "github.com/coreos/go-log/log"
  18. "github.com/coreos/go-raft"
  19. )
  20. //--------------------------------------
  21. // etcd http Helper
  22. //--------------------------------------
  23. // Convert string duration to time format
  24. func durationToExpireTime(strDuration string) (time.Time, error) {
  25. if strDuration != "" {
  26. duration, err := strconv.Atoi(strDuration)
  27. if err != nil {
  28. return store.Permanent, err
  29. }
  30. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  31. } else {
  32. return store.Permanent, nil
  33. }
  34. }
  35. //--------------------------------------
  36. // Web Helper
  37. //--------------------------------------
  38. var storeMsg chan string
  39. // Help to send msg from store to webHub
  40. func webHelper() {
  41. storeMsg = make(chan string)
  42. // etcdStore.SetMessager(storeMsg)
  43. for {
  44. // transfer the new msg to webHub
  45. web.Hub().Send(<-storeMsg)
  46. }
  47. }
  48. // startWebInterface starts web interface if webURL is not empty
  49. func startWebInterface() {
  50. if argInfo.WebURL != "" {
  51. // start web
  52. go webHelper()
  53. go web.Start(r.Server, argInfo.WebURL)
  54. }
  55. }
  56. //--------------------------------------
  57. // HTTP Utilities
  58. //--------------------------------------
  59. func dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
  60. if r.State() == raft.Leader {
  61. if body, err := r.Do(c); err != nil {
  62. return err
  63. } else {
  64. if body == nil {
  65. return etcdErr.NewError(300, "Empty result from raft")
  66. } else {
  67. body, _ := body.([]byte)
  68. w.WriteHeader(http.StatusOK)
  69. w.Write(body)
  70. return nil
  71. }
  72. }
  73. } else {
  74. leader := r.Leader()
  75. // current no leader
  76. if leader == "" {
  77. return etcdErr.NewError(300, "")
  78. }
  79. url, _ := toURL(leader)
  80. redirect(url, w, req)
  81. return nil
  82. }
  83. return etcdErr.NewError(300, "")
  84. }
  85. func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
  86. path := req.URL.Path
  87. url := hostname + path
  88. debugf("Redirect to %s", url)
  89. http.Redirect(w, req, url, http.StatusTemporaryRedirect)
  90. }
  91. func decodeJsonRequest(req *http.Request, data interface{}) error {
  92. decoder := json.NewDecoder(req.Body)
  93. if err := decoder.Decode(&data); err != nil && err != io.EOF {
  94. warnf("Malformed json request: %v", err)
  95. return fmt.Errorf("Malformed json request: %v", err)
  96. }
  97. return nil
  98. }
  99. func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
  100. w.Header().Set("Content-Type", "application/json")
  101. w.WriteHeader(status)
  102. if data != nil {
  103. encoder := json.NewEncoder(w)
  104. encoder.Encode(data)
  105. }
  106. }
  107. // sanitizeURL will cleanup a host string in the format hostname:port and
  108. // attach a schema.
  109. func sanitizeURL(host string, defaultScheme string) string {
  110. // Blank URLs are fine input, just return it
  111. if len(host) == 0 {
  112. return host
  113. }
  114. p, err := url.Parse(host)
  115. if err != nil {
  116. fatal(err)
  117. }
  118. // Make sure the host is in Host:Port format
  119. _, _, err = net.SplitHostPort(host)
  120. if err != nil {
  121. fatal(err)
  122. }
  123. p = &url.URL{Host: host, Scheme: defaultScheme}
  124. return p.String()
  125. }
  126. // sanitizeListenHost cleans up the ListenHost parameter and appends a port
  127. // if necessary based on the advertised port.
  128. func sanitizeListenHost(listen string, advertised string) string {
  129. aurl, err := url.Parse(advertised)
  130. if err != nil {
  131. fatal(err)
  132. }
  133. ahost, aport, err := net.SplitHostPort(aurl.Host)
  134. if err != nil {
  135. fatal(err)
  136. }
  137. // If the listen host isn't set use the advertised host
  138. if listen == "" {
  139. listen = ahost
  140. }
  141. return net.JoinHostPort(listen, aport)
  142. }
  143. func check(err error) {
  144. if err != nil {
  145. fatal(err)
  146. }
  147. }
  148. func getNodePath(urlPath string) string {
  149. pathPrefixLen := len("/" + version + "/keys")
  150. return urlPath[pathPrefixLen:]
  151. }
  152. //--------------------------------------
  153. // Log
  154. //--------------------------------------
  155. var logger *log.Logger = log.New("etcd", false,
  156. log.CombinedSink(os.Stdout, "[%s] %s %-9s | %s\n", []string{"prefix", "time", "priority", "message"}))
  157. func infof(format string, v ...interface{}) {
  158. logger.Infof(format, v...)
  159. }
  160. func debugf(format string, v ...interface{}) {
  161. if verbose {
  162. logger.Debugf(format, v...)
  163. }
  164. }
  165. func debug(v ...interface{}) {
  166. if verbose {
  167. logger.Debug(v...)
  168. }
  169. }
  170. func warnf(format string, v ...interface{}) {
  171. logger.Warningf(format, v...)
  172. }
  173. func warn(v ...interface{}) {
  174. logger.Warning(v...)
  175. }
  176. func fatalf(format string, v ...interface{}) {
  177. logger.Fatalf(format, v...)
  178. }
  179. func fatal(v ...interface{}) {
  180. logger.Fatalln(v...)
  181. }
  182. //--------------------------------------
  183. // CPU profile
  184. //--------------------------------------
  185. func runCPUProfile() {
  186. f, err := os.Create(cpuprofile)
  187. if err != nil {
  188. fatal(err)
  189. }
  190. pprof.StartCPUProfile(f)
  191. c := make(chan os.Signal, 1)
  192. signal.Notify(c, os.Interrupt)
  193. go func() {
  194. for sig := range c {
  195. infof("captured %v, stopping profiler and exiting..", sig)
  196. pprof.StopCPUProfile()
  197. os.Exit(1)
  198. }
  199. }()
  200. }
  201. //--------------------------------------
  202. // Testing
  203. //--------------------------------------
  204. func directSet() {
  205. c := make(chan bool, 1000)
  206. for i := 0; i < 1000; i++ {
  207. go send(c)
  208. }
  209. for i := 0; i < 1000; i++ {
  210. <-c
  211. }
  212. }
  213. func send(c chan bool) {
  214. for i := 0; i < 10; i++ {
  215. command := &UpdateCommand{}
  216. command.Key = "foo"
  217. command.Value = "bar"
  218. command.ExpireTime = time.Unix(0, 0)
  219. r.Do(command)
  220. }
  221. c <- true
  222. }