restic/vendor/github.com/kurin/blazer/x/window/window.go

163 lines
4.2 KiB
Go

// Copyright 2018, the Blazer authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package window provides a type for efficiently recording events as they
// occur over a given span of time. Events added to the window will remain
// until the time expires.
package window
import (
"sync"
"time"
)
// A Window efficiently records events that have occurred over a span of time
// extending from some fixed interval ago to now. Events that pass beyond this
// horizon are discarded.
type Window struct {
mu sync.Mutex
events []interface{}
res time.Duration
last time.Time
reduce Reducer
forever bool
e interface{}
}
// A Reducer should take two values from the window and combine them into a
// third value that will be stored in the window. The values i or j may be
// nil. The underlying types for both arguments and the output should be
// identical.
//
// If the reducer is any kind of slice or list, then data usage will grow
// linearly with the number of events added to the window.
//
// Reducer will be called on its own output: Reducer(Reducer(x, y), z).
type Reducer func(i, j interface{}) interface{}
// New returns an initialized window for events over the given duration at the
// given resolution. Windows with tight resolution (i.e., small values for
// that argument) will be more accurate, at the cost of some memory.
//
// A size of 0 means "forever"; old events will never be removed.
func New(size, resolution time.Duration, r Reducer) *Window {
if size > 0 {
return &Window{
res: resolution,
events: make([]interface{}, size/resolution),
reduce: r,
}
}
return &Window{
forever: true,
reduce: r,
}
}
func (w *Window) bucket(now time.Time) int {
nanos := now.UnixNano()
abs := nanos / int64(w.res)
return int(abs) % len(w.events)
}
// sweep keeps the window valid. It needs to be called from every method that
// views or updates the window, and the caller needs to hold the mutex.
func (w *Window) sweep(now time.Time) {
if w.forever {
return
}
defer func() {
w.last = now
}()
// This compares now and w.last's monotonic clocks.
diff := now.Sub(w.last)
if diff < 0 {
// time went backwards somehow; zero events and return
for i := range w.events {
w.events[i] = nil
}
return
}
last := now.Add(-diff)
b := w.bucket(now)
p := w.bucket(last)
if b == p && diff <= w.res {
// We're in the same bucket as the previous sweep, so all buckets are
// valid.
return
}
if diff > w.res*time.Duration(len(w.events)) {
// We've gone longer than this window measures since the last sweep, just
// zero the thing and have done.
for i := range w.events {
w.events[i] = nil
}
return
}
// Expire all invalid buckets. This means buckets not seen since the
// previous sweep and now, including the current bucket but not including the
// previous bucket.
old := int64(last.UnixNano()) / int64(w.res)
new := int64(now.UnixNano()) / int64(w.res)
for i := old + 1; i <= new; i++ {
b := int(i) % len(w.events)
w.events[b] = nil
}
}
// Insert adds the given event.
func (w *Window) Insert(e interface{}) {
w.insertAt(time.Now(), e)
}
func (w *Window) insertAt(t time.Time, e interface{}) {
w.mu.Lock()
defer w.mu.Unlock()
if w.forever {
w.e = w.reduce(w.e, e)
return
}
w.sweep(t)
w.events[w.bucket(t)] = w.reduce(w.events[w.bucket(t)], e)
}
// Reduce runs the window's reducer over the valid values and returns the
// result.
func (w *Window) Reduce() interface{} {
return w.reducedAt(time.Now())
}
func (w *Window) reducedAt(t time.Time) interface{} {
w.mu.Lock()
defer w.mu.Unlock()
if w.forever {
return w.e
}
w.sweep(t)
var n interface{}
for i := range w.events {
n = w.reduce(n, w.events[i])
}
return n
}