discovery_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package discovery
  15. import (
  16. "context"
  17. "errors"
  18. "math"
  19. "math/rand"
  20. "net/http"
  21. "reflect"
  22. "sort"
  23. "strconv"
  24. "testing"
  25. "time"
  26. "go.uber.org/zap"
  27. "github.com/coreos/etcd/client"
  28. "github.com/jonboulle/clockwork"
  29. )
  30. const (
  31. maxRetryInTest = 3
  32. )
  33. func TestNewProxyFuncUnset(t *testing.T) {
  34. pf, err := newProxyFunc(zap.NewExample(), "")
  35. if pf != nil {
  36. t.Fatal("unexpected non-nil proxyFunc")
  37. }
  38. if err != nil {
  39. t.Fatalf("unexpected non-nil err: %v", err)
  40. }
  41. }
  42. func TestNewProxyFuncBad(t *testing.T) {
  43. tests := []string{
  44. "%%",
  45. "http://foo.com/%1",
  46. }
  47. for i, in := range tests {
  48. pf, err := newProxyFunc(zap.NewExample(), in)
  49. if pf != nil {
  50. t.Errorf("#%d: unexpected non-nil proxyFunc", i)
  51. }
  52. if err == nil {
  53. t.Errorf("#%d: unexpected nil err", i)
  54. }
  55. }
  56. }
  57. func TestNewProxyFunc(t *testing.T) {
  58. tests := map[string]string{
  59. "bar.com": "http://bar.com",
  60. "http://disco.foo.bar": "http://disco.foo.bar",
  61. }
  62. for in, w := range tests {
  63. pf, err := newProxyFunc(zap.NewExample(), in)
  64. if pf == nil {
  65. t.Errorf("%s: unexpected nil proxyFunc", in)
  66. continue
  67. }
  68. if err != nil {
  69. t.Errorf("%s: unexpected non-nil err: %v", in, err)
  70. continue
  71. }
  72. g, err := pf(&http.Request{})
  73. if err != nil {
  74. t.Errorf("%s: unexpected non-nil err: %v", in, err)
  75. }
  76. if g.String() != w {
  77. t.Errorf("%s: proxyURL=%q, want %q", in, g, w)
  78. }
  79. }
  80. }
  81. func TestCheckCluster(t *testing.T) {
  82. cluster := "/prefix/1000"
  83. self := "/1000/1"
  84. tests := []struct {
  85. nodes []*client.Node
  86. index uint64
  87. werr error
  88. wsize int
  89. }{
  90. {
  91. // self is in the size range
  92. []*client.Node{
  93. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  94. {Key: "/1000/_config/"},
  95. {Key: self, CreatedIndex: 2},
  96. {Key: "/1000/2", CreatedIndex: 3},
  97. {Key: "/1000/3", CreatedIndex: 4},
  98. {Key: "/1000/4", CreatedIndex: 5},
  99. },
  100. 5,
  101. nil,
  102. 3,
  103. },
  104. {
  105. // self is in the size range
  106. []*client.Node{
  107. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  108. {Key: "/1000/_config/"},
  109. {Key: "/1000/2", CreatedIndex: 2},
  110. {Key: "/1000/3", CreatedIndex: 3},
  111. {Key: self, CreatedIndex: 4},
  112. {Key: "/1000/4", CreatedIndex: 5},
  113. },
  114. 5,
  115. nil,
  116. 3,
  117. },
  118. {
  119. // self is out of the size range
  120. []*client.Node{
  121. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  122. {Key: "/1000/_config/"},
  123. {Key: "/1000/2", CreatedIndex: 2},
  124. {Key: "/1000/3", CreatedIndex: 3},
  125. {Key: "/1000/4", CreatedIndex: 4},
  126. {Key: self, CreatedIndex: 5},
  127. },
  128. 5,
  129. ErrFullCluster,
  130. 3,
  131. },
  132. {
  133. // self is not in the cluster
  134. []*client.Node{
  135. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  136. {Key: "/1000/_config/"},
  137. {Key: "/1000/2", CreatedIndex: 2},
  138. {Key: "/1000/3", CreatedIndex: 3},
  139. },
  140. 3,
  141. nil,
  142. 3,
  143. },
  144. {
  145. []*client.Node{
  146. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  147. {Key: "/1000/_config/"},
  148. {Key: "/1000/2", CreatedIndex: 2},
  149. {Key: "/1000/3", CreatedIndex: 3},
  150. {Key: "/1000/4", CreatedIndex: 4},
  151. },
  152. 3,
  153. ErrFullCluster,
  154. 3,
  155. },
  156. {
  157. // bad size key
  158. []*client.Node{
  159. {Key: "/1000/_config/size", Value: "bad", CreatedIndex: 1},
  160. },
  161. 0,
  162. ErrBadSizeKey,
  163. 0,
  164. },
  165. {
  166. // no size key
  167. []*client.Node{},
  168. 0,
  169. ErrSizeNotFound,
  170. 0,
  171. },
  172. }
  173. for i, tt := range tests {
  174. var rs []*client.Response
  175. if len(tt.nodes) > 0 {
  176. rs = append(rs, &client.Response{Node: tt.nodes[0], Index: tt.index})
  177. rs = append(rs, &client.Response{
  178. Node: &client.Node{
  179. Key: cluster,
  180. Nodes: tt.nodes[1:],
  181. },
  182. Index: tt.index,
  183. })
  184. }
  185. c := &clientWithResp{rs: rs}
  186. dBase := discovery{cluster: cluster, id: 1, c: c}
  187. cRetry := &clientWithRetry{failTimes: 3}
  188. cRetry.rs = rs
  189. fc := clockwork.NewFakeClock()
  190. dRetry := discovery{cluster: cluster, id: 1, c: cRetry, clock: fc}
  191. for _, d := range []discovery{dBase, dRetry} {
  192. go func() {
  193. for i := uint(1); i <= maxRetryInTest; i++ {
  194. fc.BlockUntil(1)
  195. fc.Advance(time.Second * (0x1 << i))
  196. }
  197. }()
  198. ns, size, index, err := d.checkCluster()
  199. if err != tt.werr {
  200. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  201. }
  202. if reflect.DeepEqual(ns, tt.nodes) {
  203. t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
  204. }
  205. if size != tt.wsize {
  206. t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
  207. }
  208. if index != tt.index {
  209. t.Errorf("#%d: index = %v, want %d", i, index, tt.index)
  210. }
  211. }
  212. }
  213. }
  214. func TestWaitNodes(t *testing.T) {
  215. all := []*client.Node{
  216. 0: {Key: "/1000/1", CreatedIndex: 2},
  217. 1: {Key: "/1000/2", CreatedIndex: 3},
  218. 2: {Key: "/1000/3", CreatedIndex: 4},
  219. }
  220. tests := []struct {
  221. nodes []*client.Node
  222. rs []*client.Response
  223. }{
  224. {
  225. all,
  226. []*client.Response{},
  227. },
  228. {
  229. all[:1],
  230. []*client.Response{
  231. {Node: &client.Node{Key: "/1000/2", CreatedIndex: 3}},
  232. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  233. },
  234. },
  235. {
  236. all[:2],
  237. []*client.Response{
  238. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  239. },
  240. },
  241. {
  242. append(all, &client.Node{Key: "/1000/4", CreatedIndex: 5}),
  243. []*client.Response{
  244. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  245. },
  246. },
  247. }
  248. for i, tt := range tests {
  249. // Basic case
  250. c := &clientWithResp{rs: nil, w: &watcherWithResp{rs: tt.rs}}
  251. dBase := &discovery{cluster: "1000", c: c}
  252. // Retry case
  253. var retryScanResp []*client.Response
  254. if len(tt.nodes) > 0 {
  255. retryScanResp = append(retryScanResp, &client.Response{
  256. Node: &client.Node{
  257. Key: "1000",
  258. Value: strconv.Itoa(3),
  259. },
  260. })
  261. retryScanResp = append(retryScanResp, &client.Response{
  262. Node: &client.Node{
  263. Nodes: tt.nodes,
  264. },
  265. })
  266. }
  267. cRetry := &clientWithResp{
  268. rs: retryScanResp,
  269. w: &watcherWithRetry{rs: tt.rs, failTimes: 2},
  270. }
  271. fc := clockwork.NewFakeClock()
  272. dRetry := &discovery{
  273. cluster: "1000",
  274. c: cRetry,
  275. clock: fc,
  276. }
  277. for _, d := range []*discovery{dBase, dRetry} {
  278. go func() {
  279. for i := uint(1); i <= maxRetryInTest; i++ {
  280. fc.BlockUntil(1)
  281. fc.Advance(time.Second * (0x1 << i))
  282. }
  283. }()
  284. g, err := d.waitNodes(tt.nodes, 3, 0) // we do not care about index in this test
  285. if err != nil {
  286. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  287. }
  288. if !reflect.DeepEqual(g, all) {
  289. t.Errorf("#%d: all = %v, want %v", i, g, all)
  290. }
  291. }
  292. }
  293. }
  294. func TestCreateSelf(t *testing.T) {
  295. rs := []*client.Response{{Node: &client.Node{Key: "1000/1", CreatedIndex: 2}}}
  296. w := &watcherWithResp{rs: rs}
  297. errw := &watcherWithErr{err: errors.New("watch err")}
  298. c := &clientWithResp{rs: rs, w: w}
  299. errc := &clientWithErr{err: errors.New("create err"), w: w}
  300. errdupc := &clientWithErr{err: client.Error{Code: client.ErrorCodeNodeExist}}
  301. errwc := &clientWithResp{rs: rs, w: errw}
  302. tests := []struct {
  303. c client.KeysAPI
  304. werr error
  305. }{
  306. // no error
  307. {c, nil},
  308. // client.create returns an error
  309. {errc, errc.err},
  310. // watcher.next returns an error
  311. {errwc, errw.err},
  312. // parse key exist error to duplicate ID error
  313. {errdupc, ErrDuplicateID},
  314. }
  315. for i, tt := range tests {
  316. d := discovery{cluster: "1000", c: tt.c}
  317. if err := d.createSelf(""); err != tt.werr {
  318. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  319. }
  320. }
  321. }
  322. func TestNodesToCluster(t *testing.T) {
  323. tests := []struct {
  324. nodes []*client.Node
  325. size int
  326. wcluster string
  327. werr error
  328. }{
  329. {
  330. []*client.Node{
  331. 0: {Key: "/1000/1", Value: "1=http://1.1.1.1:2380", CreatedIndex: 1},
  332. 1: {Key: "/1000/2", Value: "2=http://2.2.2.2:2380", CreatedIndex: 2},
  333. 2: {Key: "/1000/3", Value: "3=http://3.3.3.3:2380", CreatedIndex: 3},
  334. },
  335. 3,
  336. "1=http://1.1.1.1:2380,2=http://2.2.2.2:2380,3=http://3.3.3.3:2380",
  337. nil,
  338. },
  339. {
  340. []*client.Node{
  341. 0: {Key: "/1000/1", Value: "1=http://1.1.1.1:2380", CreatedIndex: 1},
  342. 1: {Key: "/1000/2", Value: "2=http://2.2.2.2:2380", CreatedIndex: 2},
  343. 2: {Key: "/1000/3", Value: "2=http://3.3.3.3:2380", CreatedIndex: 3},
  344. },
  345. 3,
  346. "1=http://1.1.1.1:2380,2=http://2.2.2.2:2380,2=http://3.3.3.3:2380",
  347. ErrDuplicateName,
  348. },
  349. {
  350. []*client.Node{
  351. 0: {Key: "/1000/1", Value: "1=1.1.1.1:2380", CreatedIndex: 1},
  352. 1: {Key: "/1000/2", Value: "2=http://2.2.2.2:2380", CreatedIndex: 2},
  353. 2: {Key: "/1000/3", Value: "2=http://3.3.3.3:2380", CreatedIndex: 3},
  354. },
  355. 3,
  356. "1=1.1.1.1:2380,2=http://2.2.2.2:2380,2=http://3.3.3.3:2380",
  357. ErrInvalidURL,
  358. },
  359. }
  360. for i, tt := range tests {
  361. cluster, err := nodesToCluster(tt.nodes, tt.size)
  362. if err != tt.werr {
  363. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  364. }
  365. if !reflect.DeepEqual(cluster, tt.wcluster) {
  366. t.Errorf("#%d: cluster = %v, want %v", i, cluster, tt.wcluster)
  367. }
  368. }
  369. }
  370. func TestSortableNodes(t *testing.T) {
  371. ns := []*client.Node{
  372. 0: {CreatedIndex: 5},
  373. 1: {CreatedIndex: 1},
  374. 2: {CreatedIndex: 3},
  375. 3: {CreatedIndex: 4},
  376. }
  377. // add some randomness
  378. for i := 0; i < 10000; i++ {
  379. ns = append(ns, &client.Node{CreatedIndex: uint64(rand.Int31())})
  380. }
  381. sns := sortableNodes{ns}
  382. sort.Sort(sns)
  383. var cis []int
  384. for _, n := range sns.Nodes {
  385. cis = append(cis, int(n.CreatedIndex))
  386. }
  387. if !sort.IntsAreSorted(cis) {
  388. t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
  389. }
  390. cis = make([]int, 0)
  391. for _, n := range ns {
  392. cis = append(cis, int(n.CreatedIndex))
  393. }
  394. if !sort.IntsAreSorted(cis) {
  395. t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
  396. }
  397. }
  398. func TestRetryFailure(t *testing.T) {
  399. nRetries = maxRetryInTest
  400. defer func() { nRetries = math.MaxUint32 }()
  401. cluster := "1000"
  402. c := &clientWithRetry{failTimes: 4}
  403. fc := clockwork.NewFakeClock()
  404. d := discovery{
  405. cluster: cluster,
  406. id: 1,
  407. c: c,
  408. clock: fc,
  409. }
  410. go func() {
  411. for i := uint(1); i <= maxRetryInTest; i++ {
  412. fc.BlockUntil(1)
  413. fc.Advance(time.Second * (0x1 << i))
  414. }
  415. }()
  416. if _, _, _, err := d.checkCluster(); err != ErrTooManyRetries {
  417. t.Errorf("err = %v, want %v", err, ErrTooManyRetries)
  418. }
  419. }
  420. type clientWithResp struct {
  421. rs []*client.Response
  422. w client.Watcher
  423. client.KeysAPI
  424. }
  425. func (c *clientWithResp) Create(ctx context.Context, key string, value string) (*client.Response, error) {
  426. if len(c.rs) == 0 {
  427. return &client.Response{}, nil
  428. }
  429. r := c.rs[0]
  430. c.rs = c.rs[1:]
  431. return r, nil
  432. }
  433. func (c *clientWithResp) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
  434. if len(c.rs) == 0 {
  435. return &client.Response{}, &client.Error{Code: client.ErrorCodeKeyNotFound}
  436. }
  437. r := c.rs[0]
  438. c.rs = append(c.rs[1:], r)
  439. return r, nil
  440. }
  441. func (c *clientWithResp) Watcher(key string, opts *client.WatcherOptions) client.Watcher {
  442. return c.w
  443. }
  444. type clientWithErr struct {
  445. err error
  446. w client.Watcher
  447. client.KeysAPI
  448. }
  449. func (c *clientWithErr) Create(ctx context.Context, key string, value string) (*client.Response, error) {
  450. return &client.Response{}, c.err
  451. }
  452. func (c *clientWithErr) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
  453. return &client.Response{}, c.err
  454. }
  455. func (c *clientWithErr) Watcher(key string, opts *client.WatcherOptions) client.Watcher {
  456. return c.w
  457. }
  458. type watcherWithResp struct {
  459. client.KeysAPI
  460. rs []*client.Response
  461. }
  462. func (w *watcherWithResp) Next(context.Context) (*client.Response, error) {
  463. if len(w.rs) == 0 {
  464. return &client.Response{}, nil
  465. }
  466. r := w.rs[0]
  467. w.rs = w.rs[1:]
  468. return r, nil
  469. }
  470. type watcherWithErr struct {
  471. err error
  472. }
  473. func (w *watcherWithErr) Next(context.Context) (*client.Response, error) {
  474. return &client.Response{}, w.err
  475. }
  476. // clientWithRetry will timeout all requests up to failTimes
  477. type clientWithRetry struct {
  478. clientWithResp
  479. failCount int
  480. failTimes int
  481. }
  482. func (c *clientWithRetry) Create(ctx context.Context, key string, value string) (*client.Response, error) {
  483. if c.failCount < c.failTimes {
  484. c.failCount++
  485. return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
  486. }
  487. return c.clientWithResp.Create(ctx, key, value)
  488. }
  489. func (c *clientWithRetry) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
  490. if c.failCount < c.failTimes {
  491. c.failCount++
  492. return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
  493. }
  494. return c.clientWithResp.Get(ctx, key, opts)
  495. }
  496. // watcherWithRetry will timeout all requests up to failTimes
  497. type watcherWithRetry struct {
  498. rs []*client.Response
  499. failCount int
  500. failTimes int
  501. }
  502. func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
  503. if w.failCount < w.failTimes {
  504. w.failCount++
  505. return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
  506. }
  507. if len(w.rs) == 0 {
  508. return &client.Response{}, nil
  509. }
  510. r := w.rs[0]
  511. w.rs = w.rs[1:]
  512. return r, nil
  513. }