# pyruse is intended as a replacement to both fail2ban and epylog # Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. import json import os import string from collections import OrderedDict from datetime import datetime from pyruse import base, config, email class Action(base.Action): _storage = config.Config().asMap().get("storage", "/var/lib/pyruse") \ + "/" + os.path.basename(__file__) + ".journal" _out = None _hour = 0 _txtDocStart = '= Pyruse Report\n\n' _txtHeadWarn = '== WARNING Messages\n\n' _txtHeadInfo = '\n== Information Messages\n\n' _txtHeadOther = '\n== Other log events\n\n' _txtTableDelim = '|===============================================================================\n' _txtTableHeader = '|Count|Message |Date+time for each occurrence\n' _txtPreDelim = '----------\n' _htmDocStart = '\n
\n\nCount | Message | Date+time for each occurrence |
---|
' _htmPreStop = '\n' def _closeJournal(): Action._out.write("{}]") Action._out.close() Action._out = None def _openJournal(): if Action._out is None: if os.path.exists(Action._storage): Action._out = open(Action._storage, "a", 1) else: Action._out = open(Action._storage, "w", 1) Action._out.write("[\n") def __init__(self, args): super().__init__() l = args["level"] if l == "WARN": self.level = 1 elif l == "INFO": self.level = 2 else: self.level = 0 self.template = args["message"] values = {} for (_void, name, _void, _void) in string.Formatter().parse(self.template): if name: values[name] = None self.values = values def act(self, entry): for (name, _void) in self.values.items(): self.values[name] = entry.get(name, None) msg = self.template.format_map(self.values) json.dump( OrderedDict(L = self.level, T = entry["__REALTIME_TIMESTAMP"].timestamp(), M = msg), Action._out ) Action._out.write(",\n") thisHour = datetime.today().hour if thisHour < Action._hour: Action._closeJournal() self._sendReport() Action._openJournal() Action._hour = thisHour def _encode(self, text): return text.replace('&', '&').replace('<', '<').replace('>', '>') def _toAdoc(self, msg, times): return "\n|{count:^5d}|{text}\n |{times}\n".format_map( {"count": len(times), "text": msg, "times": " +\n ".join(str(t) for t in times)} ) def _toHtml(self, msg, times): return "