dialoptions.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "net"
  23. "time"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/grpclog"
  27. "google.golang.org/grpc/internal"
  28. "google.golang.org/grpc/internal/backoff"
  29. "google.golang.org/grpc/internal/envconfig"
  30. "google.golang.org/grpc/internal/transport"
  31. "google.golang.org/grpc/keepalive"
  32. "google.golang.org/grpc/resolver"
  33. "google.golang.org/grpc/stats"
  34. )
  35. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  36. // values passed to Dial.
  37. type dialOptions struct {
  38. unaryInt UnaryClientInterceptor
  39. streamInt StreamClientInterceptor
  40. chainUnaryInts []UnaryClientInterceptor
  41. chainStreamInts []StreamClientInterceptor
  42. cp Compressor
  43. dc Decompressor
  44. bs backoff.Strategy
  45. block bool
  46. insecure bool
  47. timeout time.Duration
  48. scChan <-chan ServiceConfig
  49. authority string
  50. copts transport.ConnectOptions
  51. callOptions []CallOption
  52. // This is used by v1 balancer dial option WithBalancer to support v1
  53. // balancer, and also by WithBalancerName dial option.
  54. balancerBuilder balancer.Builder
  55. // This is to support grpclb.
  56. resolverBuilder resolver.Builder
  57. channelzParentID int64
  58. disableServiceConfig bool
  59. disableRetry bool
  60. disableHealthCheck bool
  61. healthCheckFunc internal.HealthChecker
  62. minConnectTimeout func() time.Duration
  63. defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
  64. defaultServiceConfigRawJSON *string
  65. }
  66. // DialOption configures how we set up the connection.
  67. type DialOption interface {
  68. apply(*dialOptions)
  69. }
  70. // EmptyDialOption does not alter the dial configuration. It can be embedded in
  71. // another structure to build custom dial options.
  72. //
  73. // This API is EXPERIMENTAL.
  74. type EmptyDialOption struct{}
  75. func (EmptyDialOption) apply(*dialOptions) {}
  76. // funcDialOption wraps a function that modifies dialOptions into an
  77. // implementation of the DialOption interface.
  78. type funcDialOption struct {
  79. f func(*dialOptions)
  80. }
  81. func (fdo *funcDialOption) apply(do *dialOptions) {
  82. fdo.f(do)
  83. }
  84. func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
  85. return &funcDialOption{
  86. f: f,
  87. }
  88. }
  89. // WithWriteBufferSize determines how much data can be batched before doing a
  90. // write on the wire. The corresponding memory allocation for this buffer will
  91. // be twice the size to keep syscalls low. The default value for this buffer is
  92. // 32KB.
  93. //
  94. // Zero will disable the write buffer such that each write will be on underlying
  95. // connection. Note: A Send call may not directly translate to a write.
  96. func WithWriteBufferSize(s int) DialOption {
  97. return newFuncDialOption(func(o *dialOptions) {
  98. o.copts.WriteBufferSize = s
  99. })
  100. }
  101. // WithReadBufferSize lets you set the size of read buffer, this determines how
  102. // much data can be read at most for each read syscall.
  103. //
  104. // The default value for this buffer is 32KB. Zero will disable read buffer for
  105. // a connection so data framer can access the underlying conn directly.
  106. func WithReadBufferSize(s int) DialOption {
  107. return newFuncDialOption(func(o *dialOptions) {
  108. o.copts.ReadBufferSize = s
  109. })
  110. }
  111. // WithInitialWindowSize returns a DialOption which sets the value for initial
  112. // window size on a stream. The lower bound for window size is 64K and any value
  113. // smaller than that will be ignored.
  114. func WithInitialWindowSize(s int32) DialOption {
  115. return newFuncDialOption(func(o *dialOptions) {
  116. o.copts.InitialWindowSize = s
  117. })
  118. }
  119. // WithInitialConnWindowSize returns a DialOption which sets the value for
  120. // initial window size on a connection. The lower bound for window size is 64K
  121. // and any value smaller than that will be ignored.
  122. func WithInitialConnWindowSize(s int32) DialOption {
  123. return newFuncDialOption(func(o *dialOptions) {
  124. o.copts.InitialConnWindowSize = s
  125. })
  126. }
  127. // WithMaxMsgSize returns a DialOption which sets the maximum message size the
  128. // client can receive.
  129. //
  130. // Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. Will
  131. // be supported throughout 1.x.
  132. func WithMaxMsgSize(s int) DialOption {
  133. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  134. }
  135. // WithDefaultCallOptions returns a DialOption which sets the default
  136. // CallOptions for calls over the connection.
  137. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  138. return newFuncDialOption(func(o *dialOptions) {
  139. o.callOptions = append(o.callOptions, cos...)
  140. })
  141. }
  142. // WithCodec returns a DialOption which sets a codec for message marshaling and
  143. // unmarshaling.
  144. //
  145. // Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead. Will be
  146. // supported throughout 1.x.
  147. func WithCodec(c Codec) DialOption {
  148. return WithDefaultCallOptions(CallCustomCodec(c))
  149. }
  150. // WithCompressor returns a DialOption which sets a Compressor to use for
  151. // message compression. It has lower priority than the compressor set by the
  152. // UseCompressor CallOption.
  153. //
  154. // Deprecated: use UseCompressor instead. Will be supported throughout 1.x.
  155. func WithCompressor(cp Compressor) DialOption {
  156. return newFuncDialOption(func(o *dialOptions) {
  157. o.cp = cp
  158. })
  159. }
  160. // WithDecompressor returns a DialOption which sets a Decompressor to use for
  161. // incoming message decompression. If incoming response messages are encoded
  162. // using the decompressor's Type(), it will be used. Otherwise, the message
  163. // encoding will be used to look up the compressor registered via
  164. // encoding.RegisterCompressor, which will then be used to decompress the
  165. // message. If no compressor is registered for the encoding, an Unimplemented
  166. // status error will be returned.
  167. //
  168. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  169. // throughout 1.x.
  170. func WithDecompressor(dc Decompressor) DialOption {
  171. return newFuncDialOption(func(o *dialOptions) {
  172. o.dc = dc
  173. })
  174. }
  175. // WithBalancer returns a DialOption which sets a load balancer with the v1 API.
  176. // Name resolver will be ignored if this DialOption is specified.
  177. //
  178. // Deprecated: use the new balancer APIs in balancer package and
  179. // WithBalancerName. Will be removed in a future 1.x release.
  180. func WithBalancer(b Balancer) DialOption {
  181. return newFuncDialOption(func(o *dialOptions) {
  182. o.balancerBuilder = &balancerWrapperBuilder{
  183. b: b,
  184. }
  185. })
  186. }
  187. // WithBalancerName sets the balancer that the ClientConn will be initialized
  188. // with. Balancer registered with balancerName will be used. This function
  189. // panics if no balancer was registered by balancerName.
  190. //
  191. // The balancer cannot be overridden by balancer option specified by service
  192. // config.
  193. //
  194. // Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig
  195. // instead. Will be removed in a future 1.x release.
  196. func WithBalancerName(balancerName string) DialOption {
  197. builder := balancer.Get(balancerName)
  198. if builder == nil {
  199. panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
  200. }
  201. return newFuncDialOption(func(o *dialOptions) {
  202. o.balancerBuilder = builder
  203. })
  204. }
  205. // withResolverBuilder is only for grpclb.
  206. func withResolverBuilder(b resolver.Builder) DialOption {
  207. return newFuncDialOption(func(o *dialOptions) {
  208. o.resolverBuilder = b
  209. })
  210. }
  211. // WithServiceConfig returns a DialOption which has a channel to read the
  212. // service configuration.
  213. //
  214. // Deprecated: service config should be received through name resolver or via
  215. // WithDefaultServiceConfig, as specified at
  216. // https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be
  217. // removed in a future 1.x release.
  218. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  219. return newFuncDialOption(func(o *dialOptions) {
  220. o.scChan = c
  221. })
  222. }
  223. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  224. // when backing off after failed connection attempts.
  225. func WithBackoffMaxDelay(md time.Duration) DialOption {
  226. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  227. }
  228. // WithBackoffConfig configures the dialer to use the provided backoff
  229. // parameters after connection failures.
  230. //
  231. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  232. // for use.
  233. func WithBackoffConfig(b BackoffConfig) DialOption {
  234. return withBackoff(backoff.Exponential{
  235. MaxDelay: b.MaxDelay,
  236. })
  237. }
  238. // withBackoff sets the backoff strategy used for connectRetryNum after a failed
  239. // connection attempt.
  240. //
  241. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  242. func withBackoff(bs backoff.Strategy) DialOption {
  243. return newFuncDialOption(func(o *dialOptions) {
  244. o.bs = bs
  245. })
  246. }
  247. // WithBlock returns a DialOption which makes caller of Dial blocks until the
  248. // underlying connection is up. Without this, Dial returns immediately and
  249. // connecting the server happens in background.
  250. func WithBlock() DialOption {
  251. return newFuncDialOption(func(o *dialOptions) {
  252. o.block = true
  253. })
  254. }
  255. // WithInsecure returns a DialOption which disables transport security for this
  256. // ClientConn. Note that transport security is required unless WithInsecure is
  257. // set.
  258. func WithInsecure() DialOption {
  259. return newFuncDialOption(func(o *dialOptions) {
  260. o.insecure = true
  261. })
  262. }
  263. // WithTransportCredentials returns a DialOption which configures a connection
  264. // level security credentials (e.g., TLS/SSL). This should not be used together
  265. // with WithCredentialsBundle.
  266. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  267. return newFuncDialOption(func(o *dialOptions) {
  268. o.copts.TransportCredentials = creds
  269. })
  270. }
  271. // WithPerRPCCredentials returns a DialOption which sets credentials and places
  272. // auth state on each outbound RPC.
  273. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  274. return newFuncDialOption(func(o *dialOptions) {
  275. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  276. })
  277. }
  278. // WithCredentialsBundle returns a DialOption to set a credentials bundle for
  279. // the ClientConn.WithCreds. This should not be used together with
  280. // WithTransportCredentials.
  281. //
  282. // This API is experimental.
  283. func WithCredentialsBundle(b credentials.Bundle) DialOption {
  284. return newFuncDialOption(func(o *dialOptions) {
  285. o.copts.CredsBundle = b
  286. })
  287. }
  288. // WithTimeout returns a DialOption that configures a timeout for dialing a
  289. // ClientConn initially. This is valid if and only if WithBlock() is present.
  290. //
  291. // Deprecated: use DialContext and context.WithTimeout instead. Will be
  292. // supported throughout 1.x.
  293. func WithTimeout(d time.Duration) DialOption {
  294. return newFuncDialOption(func(o *dialOptions) {
  295. o.timeout = d
  296. })
  297. }
  298. // WithContextDialer returns a DialOption that sets a dialer to create
  299. // connections. If FailOnNonTempDialError() is set to true, and an error is
  300. // returned by f, gRPC checks the error's Temporary() method to decide if it
  301. // should try to reconnect to the network address.
  302. func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
  303. return newFuncDialOption(func(o *dialOptions) {
  304. o.copts.Dialer = f
  305. })
  306. }
  307. func init() {
  308. internal.WithResolverBuilder = withResolverBuilder
  309. internal.WithHealthCheckFunc = withHealthCheckFunc
  310. }
  311. // WithDialer returns a DialOption that specifies a function to use for dialing
  312. // network addresses. If FailOnNonTempDialError() is set to true, and an error
  313. // is returned by f, gRPC checks the error's Temporary() method to decide if it
  314. // should try to reconnect to the network address.
  315. //
  316. // Deprecated: use WithContextDialer instead. Will be supported throughout
  317. // 1.x.
  318. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  319. return WithContextDialer(
  320. func(ctx context.Context, addr string) (net.Conn, error) {
  321. if deadline, ok := ctx.Deadline(); ok {
  322. return f(addr, time.Until(deadline))
  323. }
  324. return f(addr, 0)
  325. })
  326. }
  327. // WithStatsHandler returns a DialOption that specifies the stats handler for
  328. // all the RPCs and underlying network connections in this ClientConn.
  329. func WithStatsHandler(h stats.Handler) DialOption {
  330. return newFuncDialOption(func(o *dialOptions) {
  331. o.copts.StatsHandler = h
  332. })
  333. }
  334. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
  335. // non-temporary dial errors. If f is true, and dialer returns a non-temporary
  336. // error, gRPC will fail the connection to the network address and won't try to
  337. // reconnect. The default value of FailOnNonTempDialError is false.
  338. //
  339. // FailOnNonTempDialError only affects the initial dial, and does not do
  340. // anything useful unless you are also using WithBlock().
  341. //
  342. // This is an EXPERIMENTAL API.
  343. func FailOnNonTempDialError(f bool) DialOption {
  344. return newFuncDialOption(func(o *dialOptions) {
  345. o.copts.FailOnNonTempDialError = f
  346. })
  347. }
  348. // WithUserAgent returns a DialOption that specifies a user agent string for all
  349. // the RPCs.
  350. func WithUserAgent(s string) DialOption {
  351. return newFuncDialOption(func(o *dialOptions) {
  352. o.copts.UserAgent = s
  353. })
  354. }
  355. // WithKeepaliveParams returns a DialOption that specifies keepalive parameters
  356. // for the client transport.
  357. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  358. if kp.Time < internal.KeepaliveMinPingTime {
  359. grpclog.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
  360. kp.Time = internal.KeepaliveMinPingTime
  361. }
  362. return newFuncDialOption(func(o *dialOptions) {
  363. o.copts.KeepaliveParams = kp
  364. })
  365. }
  366. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for
  367. // unary RPCs.
  368. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  369. return newFuncDialOption(func(o *dialOptions) {
  370. o.unaryInt = f
  371. })
  372. }
  373. // WithChainUnaryInterceptor returns a DialOption that specifies the chained
  374. // interceptor for unary RPCs. The first interceptor will be the outer most,
  375. // while the last interceptor will be the inner most wrapper around the real call.
  376. // All interceptors added by this method will be chained, and the interceptor
  377. // defined by WithUnaryInterceptor will always be prepended to the chain.
  378. func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption {
  379. return newFuncDialOption(func(o *dialOptions) {
  380. o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
  381. })
  382. }
  383. // WithStreamInterceptor returns a DialOption that specifies the interceptor for
  384. // streaming RPCs.
  385. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  386. return newFuncDialOption(func(o *dialOptions) {
  387. o.streamInt = f
  388. })
  389. }
  390. // WithChainStreamInterceptor returns a DialOption that specifies the chained
  391. // interceptor for unary RPCs. The first interceptor will be the outer most,
  392. // while the last interceptor will be the inner most wrapper around the real call.
  393. // All interceptors added by this method will be chained, and the interceptor
  394. // defined by WithStreamInterceptor will always be prepended to the chain.
  395. func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption {
  396. return newFuncDialOption(func(o *dialOptions) {
  397. o.chainStreamInts = append(o.chainStreamInts, interceptors...)
  398. })
  399. }
  400. // WithAuthority returns a DialOption that specifies the value to be used as the
  401. // :authority pseudo-header. This value only works with WithInsecure and has no
  402. // effect if TransportCredentials are present.
  403. func WithAuthority(a string) DialOption {
  404. return newFuncDialOption(func(o *dialOptions) {
  405. o.authority = a
  406. })
  407. }
  408. // WithChannelzParentID returns a DialOption that specifies the channelz ID of
  409. // current ClientConn's parent. This function is used in nested channel creation
  410. // (e.g. grpclb dial).
  411. func WithChannelzParentID(id int64) DialOption {
  412. return newFuncDialOption(func(o *dialOptions) {
  413. o.channelzParentID = id
  414. })
  415. }
  416. // WithDisableServiceConfig returns a DialOption that causes gRPC to ignore any
  417. // service config provided by the resolver and provides a hint to the resolver
  418. // to not fetch service configs.
  419. //
  420. // Note that this dial option only disables service config from resolver. If
  421. // default service config is provided, gRPC will use the default service config.
  422. func WithDisableServiceConfig() DialOption {
  423. return newFuncDialOption(func(o *dialOptions) {
  424. o.disableServiceConfig = true
  425. })
  426. }
  427. // WithDefaultServiceConfig returns a DialOption that configures the default
  428. // service config, which will be used in cases where:
  429. //
  430. // 1. WithDisableServiceConfig is also used.
  431. // 2. Resolver does not return a service config or if the resolver returns an
  432. // invalid service config.
  433. //
  434. // This API is EXPERIMENTAL.
  435. func WithDefaultServiceConfig(s string) DialOption {
  436. return newFuncDialOption(func(o *dialOptions) {
  437. o.defaultServiceConfigRawJSON = &s
  438. })
  439. }
  440. // WithDisableRetry returns a DialOption that disables retries, even if the
  441. // service config enables them. This does not impact transparent retries, which
  442. // will happen automatically if no data is written to the wire or if the RPC is
  443. // unprocessed by the remote server.
  444. //
  445. // Retry support is currently disabled by default, but will be enabled by
  446. // default in the future. Until then, it may be enabled by setting the
  447. // environment variable "GRPC_GO_RETRY" to "on".
  448. //
  449. // This API is EXPERIMENTAL.
  450. func WithDisableRetry() DialOption {
  451. return newFuncDialOption(func(o *dialOptions) {
  452. o.disableRetry = true
  453. })
  454. }
  455. // WithMaxHeaderListSize returns a DialOption that specifies the maximum
  456. // (uncompressed) size of header list that the client is prepared to accept.
  457. func WithMaxHeaderListSize(s uint32) DialOption {
  458. return newFuncDialOption(func(o *dialOptions) {
  459. o.copts.MaxHeaderListSize = &s
  460. })
  461. }
  462. // WithDisableHealthCheck disables the LB channel health checking for all
  463. // SubConns of this ClientConn.
  464. //
  465. // This API is EXPERIMENTAL.
  466. func WithDisableHealthCheck() DialOption {
  467. return newFuncDialOption(func(o *dialOptions) {
  468. o.disableHealthCheck = true
  469. })
  470. }
  471. // withHealthCheckFunc replaces the default health check function with the
  472. // provided one. It makes tests easier to change the health check function.
  473. //
  474. // For testing purpose only.
  475. func withHealthCheckFunc(f internal.HealthChecker) DialOption {
  476. return newFuncDialOption(func(o *dialOptions) {
  477. o.healthCheckFunc = f
  478. })
  479. }
  480. func defaultDialOptions() dialOptions {
  481. return dialOptions{
  482. disableRetry: !envconfig.Retry,
  483. healthCheckFunc: internal.HealthCheckFunc,
  484. copts: transport.ConnectOptions{
  485. WriteBufferSize: defaultWriteBufSize,
  486. ReadBufferSize: defaultReadBufSize,
  487. },
  488. }
  489. }
  490. // withGetMinConnectDeadline specifies the function that clientconn uses to
  491. // get minConnectDeadline. This can be used to make connection attempts happen
  492. // faster/slower.
  493. //
  494. // For testing purpose only.
  495. func withMinConnectDeadline(f func() time.Duration) DialOption {
  496. return newFuncDialOption(func(o *dialOptions) {
  497. o.minConnectTimeout = f
  498. })
  499. }