pyruse/pyruse/counter.py

107 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# pyruse is intended as a replacement to both fail2ban and epylog
# Copyright © 20172018 Y. Gablin
# Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing.
import datetime
class _GraceAndTicks():
def __init__(self):
self.grace = None
self.ticks = []
class _CounterData():
def __init__(self):
self._keyVals = {}
def clean(self, refDT):
for k in list(self._keyVals.keys()):
v = self._keyVals[k]
if v.grace and v.grace <= refDT:
v.grace = None
if v.grace is None:
# None values (∞) are at the end of the list
for i in range(0, len(v.ticks)):
if v.ticks[i] and v.ticks[i] <= refDT:
continue
v.ticks = v.ticks[i:]
break
else:
del self._keyVals[k]
def graceActive(self, counterKey, refDT):
self.clean(refDT)
return counterKey in self._keyVals and self._keyVals[counterKey].grace
def augment(self, counterKey, refDT, until):
self.clean(refDT)
if counterKey in self._keyVals:
v = self._keyVals[counterKey]
if v.grace:
return 0
else:
v = _GraceAndTicks()
self._keyVals[counterKey] = v
l = len(v.ticks)
# chances are that until is greater than the last item
for i in range(l, 0, -1):
if until is None or (v.ticks[i - 1] and v.ticks[i - 1] < until):
v.ticks.insert(i, until)
break
else:
v.ticks.insert(0, until)
return l + 1
def lower(self, counterKey, refDT):
self.clean(refDT)
v = self._keyVals.get(counterKey, None)
if v is None or v.grace:
return 0
l = len(v.ticks)
if l == 1:
del self._keyVals[counterKey]
return 0
v.ticks.pop()
return l - 1
def reset(self, counterKey, refDT, graceUntil):
self.clean(refDT)
if graceUntil:
v = _GraceAndTicks()
v.grace = graceUntil
self._keyVals[counterKey] = v
elif counterKey in self._keyVals:
del self._keyVals[counterKey]
class Counter():
_counters = {}
def __init__(self, counter):
if counter not in Counter._counters:
Counter._counters[counter] = _CounterData()
self.counter = Counter._counters[counter]
def augment(self, counterKey, duration = None):
now = datetime.datetime.utcnow()
if self.counter.graceActive(counterKey, now):
return 0
else:
return self.counter.augment(counterKey, now, now + duration if duration else None)
def lower(self, counterKey):
now = datetime.datetime.utcnow()
if self.counter.graceActive(counterKey, now):
return 0
else:
return self.counter.lower(counterKey, now)
def reset(self, counterKey, graceDuration = None):
now = datetime.datetime.utcnow()
self.counter.reset(
counterKey, now,
now + graceDuration if graceDuration else None
)