|
|
@@ -17,6 +17,7 @@ package lease
|
|
|
import (
|
|
|
"encoding/binary"
|
|
|
"fmt"
|
|
|
+ "math"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
@@ -74,14 +75,7 @@ func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
|
|
|
dr: dr,
|
|
|
idgen: idutil.NewGenerator(lessorID, time.Now()),
|
|
|
}
|
|
|
-
|
|
|
- tx := l.b.BatchTx()
|
|
|
- tx.Lock()
|
|
|
- tx.UnsafeCreateBucket(leaseBucketName)
|
|
|
- tx.Unlock()
|
|
|
- l.b.ForceCommit()
|
|
|
-
|
|
|
- // TODO: recover from previous state in backend.
|
|
|
+ l.initAndRecover()
|
|
|
|
|
|
return l
|
|
|
}
|
|
|
@@ -194,6 +188,32 @@ func (le *lessor) get(id LeaseID) *lease {
|
|
|
return le.leaseMap[id]
|
|
|
}
|
|
|
|
|
|
+func (le *lessor) initAndRecover() {
|
|
|
+ tx := le.b.BatchTx()
|
|
|
+ tx.Lock()
|
|
|
+ defer tx.Unlock()
|
|
|
+
|
|
|
+ tx.UnsafeCreateBucket(leaseBucketName)
|
|
|
+ _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
|
|
|
+ // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
|
|
|
+ for i := range vs {
|
|
|
+ var lpb leasepb.Lease
|
|
|
+ err := lpb.Unmarshal(vs[i])
|
|
|
+ if err != nil {
|
|
|
+ panic("failed to unmarshal lease proto item")
|
|
|
+ }
|
|
|
+ id := LeaseID(lpb.ID)
|
|
|
+ le.leaseMap[id] = &lease{
|
|
|
+ id: id,
|
|
|
+ ttl: lpb.TTL,
|
|
|
+
|
|
|
+ // itemSet will be filled in when recover key-value pairs
|
|
|
+ expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ le.b.ForceCommit()
|
|
|
+}
|
|
|
+
|
|
|
type lease struct {
|
|
|
id LeaseID
|
|
|
ttl int64 // time to live in seconds
|