|
|
@@ -136,6 +136,7 @@ func (cw *streamWriter) run() {
|
|
|
t streamType
|
|
|
enc encoder
|
|
|
flusher http.Flusher
|
|
|
+ batched int
|
|
|
)
|
|
|
tickc := time.Tick(ConnReadTimeout / 3)
|
|
|
|
|
|
@@ -146,6 +147,7 @@ func (cw *streamWriter) run() {
|
|
|
err := enc.encode(linkHeartbeatMessage)
|
|
|
if err == nil {
|
|
|
flusher.Flush()
|
|
|
+ batched = 0
|
|
|
reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start))
|
|
|
continue
|
|
|
}
|
|
|
@@ -159,7 +161,13 @@ func (cw *streamWriter) run() {
|
|
|
start := time.Now()
|
|
|
err := enc.encode(m)
|
|
|
if err == nil {
|
|
|
- flusher.Flush()
|
|
|
+ if len(msgc) == 0 || batched > streamBufSize/2 {
|
|
|
+ flusher.Flush()
|
|
|
+ batched = 0
|
|
|
+ } else {
|
|
|
+ batched++
|
|
|
+ }
|
|
|
+
|
|
|
reportSentDuration(string(t), m, time.Since(start))
|
|
|
continue
|
|
|
}
|