writesched_priority.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import (
  6. "fmt"
  7. "math"
  8. "sort"
  9. )
  10. // RFC 7540, Section 5.3.5: the default weight is 16.
  11. const priorityDefaultWeight = 15 // 16 = 15 + 1
  12. // PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
  13. type PriorityWriteSchedulerConfig struct {
  14. // MaxClosedNodesInTree controls the maximum number of closed streams to
  15. // retain in the priority tree. Setting this to zero saves a small amount
  16. // of memory at the cost of performance.
  17. //
  18. // See RFC 7540, Section 5.3.4:
  19. // "It is possible for a stream to become closed while prioritization
  20. // information ... is in transit. ... This potentially creates suboptimal
  21. // prioritization, since the stream could be given a priority that is
  22. // different from what is intended. To avoid these problems, an endpoint
  23. // SHOULD retain stream prioritization state for a period after streams
  24. // become closed. The longer state is retained, the lower the chance that
  25. // streams are assigned incorrect or default priority values."
  26. MaxClosedNodesInTree int
  27. // MaxIdleNodesInTree controls the maximum number of idle streams to
  28. // retain in the priority tree. Setting this to zero saves a small amount
  29. // of memory at the cost of performance.
  30. //
  31. // See RFC 7540, Section 5.3.4:
  32. // Similarly, streams that are in the "idle" state can be assigned
  33. // priority or become a parent of other streams. This allows for the
  34. // creation of a grouping node in the dependency tree, which enables
  35. // more flexible expressions of priority. Idle streams begin with a
  36. // default priority (Section 5.3.5).
  37. MaxIdleNodesInTree int
  38. // ThrottleOutOfOrderWrites enables write throttling to help ensure that
  39. // data is delivered in priority order. This works around a race where
  40. // stream B depends on stream A and both streams are about to call Write
  41. // to queue DATA frames. If B wins the race, a naive scheduler would eagerly
  42. // write as much data from B as possible, but this is suboptimal because A
  43. // is a higher-priority stream. With throttling enabled, we write a small
  44. // amount of data from B to minimize the amount of bandwidth that B can
  45. // steal from A.
  46. ThrottleOutOfOrderWrites bool
  47. }
  48. // NewPriorityWriteScheduler constructs a WriteScheduler that schedules
  49. // frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
  50. // If cfg is nil, default options are used.
  51. func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
  52. if cfg == nil {
  53. // For justification of these defaults, see:
  54. // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
  55. cfg = &PriorityWriteSchedulerConfig{
  56. MaxClosedNodesInTree: 10,
  57. MaxIdleNodesInTree: 10,
  58. ThrottleOutOfOrderWrites: false,
  59. }
  60. }
  61. ws := &priorityWriteScheduler{
  62. nodes: make(map[uint32]*priorityNode),
  63. maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
  64. maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
  65. enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
  66. }
  67. ws.nodes[0] = &ws.root
  68. if cfg.ThrottleOutOfOrderWrites {
  69. ws.writeThrottleLimit = 1024
  70. } else {
  71. ws.writeThrottleLimit = math.MaxInt32
  72. }
  73. return ws
  74. }
  75. type priorityNodeState int
  76. const (
  77. priorityNodeOpen priorityNodeState = iota
  78. priorityNodeClosed
  79. priorityNodeIdle
  80. )
  81. // priorityNode is a node in an HTTP/2 priority tree.
  82. // Each node is associated with a single stream ID.
  83. // See RFC 7540, Section 5.3.
  84. type priorityNode struct {
  85. q writeQueue // queue of pending frames to write
  86. id uint32 // id of the stream, or 0 for the root of the tree
  87. weight uint8 // the actual weight is weight+1, so the value is in [1,256]
  88. state priorityNodeState // open | closed | idle
  89. bytes int64 // number of bytes written by this node, or 0 if closed
  90. subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
  91. // These links form the priority tree.
  92. parent *priorityNode
  93. kids *priorityNode // start of the kids list
  94. prev, next *priorityNode // doubly-linked list of siblings
  95. }
  96. func (n *priorityNode) setParent(parent *priorityNode) {
  97. if n == parent {
  98. panic("setParent to self")
  99. }
  100. if n.parent == parent {
  101. return
  102. }
  103. // Unlink from current parent.
  104. if parent := n.parent; parent != nil {
  105. if n.prev == nil {
  106. parent.kids = n.next
  107. } else {
  108. n.prev.next = n.next
  109. }
  110. if n.next != nil {
  111. n.next.prev = n.prev
  112. }
  113. }
  114. // Link to new parent.
  115. // If parent=nil, remove n from the tree.
  116. // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
  117. n.parent = parent
  118. if parent == nil {
  119. n.next = nil
  120. n.prev = nil
  121. } else {
  122. n.next = parent.kids
  123. n.prev = nil
  124. if n.next != nil {
  125. n.next.prev = n
  126. }
  127. parent.kids = n
  128. }
  129. }
  130. func (n *priorityNode) addBytes(b int64) {
  131. n.bytes += b
  132. for ; n != nil; n = n.parent {
  133. n.subtreeBytes += b
  134. }
  135. }
  136. // walkReadyInOrder iterates over the tree in priority order, calling f for each node
  137. // with a non-empty write queue. When f returns true, this function returns true and the
  138. // walk halts. tmp is used as scratch space for sorting.
  139. //
  140. // f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
  141. // if any ancestor p of n is still open (ignoring the root node).
  142. func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
  143. if !n.q.empty() && f(n, openParent) {
  144. return true
  145. }
  146. if n.kids == nil {
  147. return false
  148. }
  149. // Don't consider the root "open" when updating openParent since
  150. // we can't send data frames on the root stream (only control frames).
  151. if n.id != 0 {
  152. openParent = openParent || (n.state == priorityNodeOpen)
  153. }
  154. // Common case: only one kid or all kids have the same weight.
  155. // Some clients don't use weights; other clients (like web browsers)
  156. // use mostly-linear priority trees.
  157. w := n.kids.weight
  158. needSort := false
  159. for k := n.kids.next; k != nil; k = k.next {
  160. if k.weight != w {
  161. needSort = true
  162. break
  163. }
  164. }
  165. if !needSort {
  166. for k := n.kids; k != nil; k = k.next {
  167. if k.walkReadyInOrder(openParent, tmp, f) {
  168. return true
  169. }
  170. }
  171. return false
  172. }
  173. // Uncommon case: sort the child nodes. We remove the kids from the parent,
  174. // then re-insert after sorting so we can reuse tmp for future sort calls.
  175. *tmp = (*tmp)[:0]
  176. for n.kids != nil {
  177. *tmp = append(*tmp, n.kids)
  178. n.kids.setParent(nil)
  179. }
  180. sort.Sort(sortPriorityNodeSiblings(*tmp))
  181. for i := len(*tmp) - 1; i >= 0; i-- {
  182. (*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
  183. }
  184. for k := n.kids; k != nil; k = k.next {
  185. if k.walkReadyInOrder(openParent, tmp, f) {
  186. return true
  187. }
  188. }
  189. return false
  190. }
  191. type sortPriorityNodeSiblings []*priorityNode
  192. func (z sortPriorityNodeSiblings) Len() int { return len(z) }
  193. func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
  194. func (z sortPriorityNodeSiblings) Less(i, k int) bool {
  195. // Prefer the subtree that has sent fewer bytes relative to its weight.
  196. // See sections 5.3.2 and 5.3.4.
  197. wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
  198. wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
  199. if bi == 0 && bk == 0 {
  200. return wi >= wk
  201. }
  202. if bk == 0 {
  203. return false
  204. }
  205. return bi/bk <= wi/wk
  206. }
  207. type priorityWriteScheduler struct {
  208. // root is the root of the priority tree, where root.id = 0.
  209. // The root queues control frames that are not associated with any stream.
  210. root priorityNode
  211. // nodes maps stream ids to priority tree nodes.
  212. nodes map[uint32]*priorityNode
  213. // maxID is the maximum stream id in nodes.
  214. maxID uint32
  215. // lists of nodes that have been closed or are idle, but are kept in
  216. // the tree for improved prioritization. When the lengths exceed either
  217. // maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
  218. closedNodes, idleNodes []*priorityNode
  219. // From the config.
  220. maxClosedNodesInTree int
  221. maxIdleNodesInTree int
  222. writeThrottleLimit int32
  223. enableWriteThrottle bool
  224. // tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
  225. tmp []*priorityNode
  226. // pool of empty queues for reuse.
  227. queuePool writeQueuePool
  228. }
  229. func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
  230. // The stream may be currently idle but cannot be opened or closed.
  231. if curr := ws.nodes[streamID]; curr != nil {
  232. if curr.state != priorityNodeIdle {
  233. panic(fmt.Sprintf("stream %d already opened", streamID))
  234. }
  235. curr.state = priorityNodeOpen
  236. return
  237. }
  238. // RFC 7540, Section 5.3.5:
  239. // "All streams are initially assigned a non-exclusive dependency on stream 0x0.
  240. // Pushed streams initially depend on their associated stream. In both cases,
  241. // streams are assigned a default weight of 16."
  242. parent := ws.nodes[options.PusherID]
  243. if parent == nil {
  244. parent = &ws.root
  245. }
  246. n := &priorityNode{
  247. q: *ws.queuePool.get(),
  248. id: streamID,
  249. weight: priorityDefaultWeight,
  250. state: priorityNodeOpen,
  251. }
  252. n.setParent(parent)
  253. ws.nodes[streamID] = n
  254. if streamID > ws.maxID {
  255. ws.maxID = streamID
  256. }
  257. }
  258. func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
  259. if streamID == 0 {
  260. panic("violation of WriteScheduler interface: cannot close stream 0")
  261. }
  262. if ws.nodes[streamID] == nil {
  263. panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
  264. }
  265. if ws.nodes[streamID].state != priorityNodeOpen {
  266. panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
  267. }
  268. n := ws.nodes[streamID]
  269. n.state = priorityNodeClosed
  270. n.addBytes(-n.bytes)
  271. q := n.q
  272. ws.queuePool.put(&q)
  273. n.q.s = nil
  274. if ws.maxClosedNodesInTree > 0 {
  275. ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
  276. } else {
  277. ws.removeNode(n)
  278. }
  279. }
  280. func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
  281. if streamID == 0 {
  282. panic("adjustPriority on root")
  283. }
  284. // If streamID does not exist, there are two cases:
  285. // - A closed stream that has been removed (this will have ID <= maxID)
  286. // - An idle stream that is being used for "grouping" (this will have ID > maxID)
  287. n := ws.nodes[streamID]
  288. if n == nil {
  289. if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
  290. return
  291. }
  292. ws.maxID = streamID
  293. n = &priorityNode{
  294. q: *ws.queuePool.get(),
  295. id: streamID,
  296. weight: priorityDefaultWeight,
  297. state: priorityNodeIdle,
  298. }
  299. n.setParent(&ws.root)
  300. ws.nodes[streamID] = n
  301. ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
  302. }
  303. // Section 5.3.1: A dependency on a stream that is not currently in the tree
  304. // results in that stream being given a default priority (Section 5.3.5).
  305. parent := ws.nodes[priority.StreamDep]
  306. if parent == nil {
  307. n.setParent(&ws.root)
  308. n.weight = priorityDefaultWeight
  309. return
  310. }
  311. // Ignore if the client tries to make a node its own parent.
  312. if n == parent {
  313. return
  314. }
  315. // Section 5.3.3:
  316. // "If a stream is made dependent on one of its own dependencies, the
  317. // formerly dependent stream is first moved to be dependent on the
  318. // reprioritized stream's previous parent. The moved dependency retains
  319. // its weight."
  320. //
  321. // That is: if parent depends on n, move parent to depend on n.parent.
  322. for x := parent.parent; x != nil; x = x.parent {
  323. if x == n {
  324. parent.setParent(n.parent)
  325. break
  326. }
  327. }
  328. // Section 5.3.3: The exclusive flag causes the stream to become the sole
  329. // dependency of its parent stream, causing other dependencies to become
  330. // dependent on the exclusive stream.
  331. if priority.Exclusive {
  332. k := parent.kids
  333. for k != nil {
  334. next := k.next
  335. if k != n {
  336. k.setParent(n)
  337. }
  338. k = next
  339. }
  340. }
  341. n.setParent(parent)
  342. n.weight = priority.Weight
  343. }
  344. func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
  345. var n *priorityNode
  346. if id := wr.StreamID(); id == 0 {
  347. n = &ws.root
  348. } else {
  349. n = ws.nodes[id]
  350. if n == nil {
  351. // id is an idle or closed stream. wr should not be a HEADERS or
  352. // DATA frame. However, wr can be a RST_STREAM. In this case, we
  353. // push wr onto the root, rather than creating a new priorityNode,
  354. // since RST_STREAM is tiny and the stream's priority is unknown
  355. // anyway. See issue #17919.
  356. if wr.DataSize() > 0 {
  357. panic("add DATA on non-open stream")
  358. }
  359. n = &ws.root
  360. }
  361. }
  362. n.q.push(wr)
  363. }
  364. func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
  365. ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
  366. limit := int32(math.MaxInt32)
  367. if openParent {
  368. limit = ws.writeThrottleLimit
  369. }
  370. wr, ok = n.q.consume(limit)
  371. if !ok {
  372. return false
  373. }
  374. n.addBytes(int64(wr.DataSize()))
  375. // If B depends on A and B continuously has data available but A
  376. // does not, gradually increase the throttling limit to allow B to
  377. // steal more and more bandwidth from A.
  378. if openParent {
  379. ws.writeThrottleLimit += 1024
  380. if ws.writeThrottleLimit < 0 {
  381. ws.writeThrottleLimit = math.MaxInt32
  382. }
  383. } else if ws.enableWriteThrottle {
  384. ws.writeThrottleLimit = 1024
  385. }
  386. return true
  387. })
  388. return wr, ok
  389. }
  390. func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
  391. if maxSize == 0 {
  392. return
  393. }
  394. if len(*list) == maxSize {
  395. // Remove the oldest node, then shift left.
  396. ws.removeNode((*list)[0])
  397. x := (*list)[1:]
  398. copy(*list, x)
  399. *list = (*list)[:len(x)]
  400. }
  401. *list = append(*list, n)
  402. }
  403. func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
  404. for k := n.kids; k != nil; k = k.next {
  405. k.setParent(n.parent)
  406. }
  407. n.setParent(nil)
  408. delete(ws.nodes, n.id)
  409. }