diff --git a/tests/action_counterRaise.py b/tests/action_counterRaise.py index 19239de..0f06c4b 100644 --- a/tests/action_counterRaise.py +++ b/tests/action_counterRaise.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. import time from pyruse.actions.action_counterRaise import Action @@ -50,3 +50,9 @@ def whenGraceTimeThenCountIs0(): time.sleep(1) raiseAct.act(entry) assert entry["action_counterRaise4"] == 1 + +def unitTests(): + whenNonExistingThenRaiseTo1() + whenKeepSecondsThenRaiseUntilTimeOut() + whenDifferentKeyThenDifferentCounter() + whenGraceTimeThenCountIs0() diff --git a/tests/action_counterReset.py b/tests/action_counterReset.py index 9d71a58..30a20ef 100644 --- a/tests/action_counterReset.py +++ b/tests/action_counterReset.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. import time from pyruse.actions.action_counterReset import Action @@ -49,3 +49,9 @@ def whenGraceTimeThenRaiseWorksAtGraceEnd(): time.sleep(1) raiseAct.act(entry) assert entry["action_counterReset4"] == 1 + +def unitTests(): + whenResetThenCountIs0() + whenNoGraceTimeThenRaiseWorks() + whenGraceTimeThenRaiseFails() + whenGraceTimeThenRaiseWorksAtGraceEnd() diff --git a/tests/action_dailyReport.py b/tests/action_dailyReport.py index 9c76acc..279dc79 100644 --- a/tests/action_dailyReport.py +++ b/tests/action_dailyReport.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. import os import re @@ -78,3 +78,8 @@ def whenReportThenNewSetOfMessages(): assert os.path.exists(mail_filename) os.remove(mail_filename) whenEmailThenCheckContents() + +def unitTests(): + whenNewDayThenReport() + whenEmailThenCheckContents() + whenReportThenNewSetOfMessages() diff --git a/tests/action_email.py b/tests/action_email.py index 4a3162c..e3e431f 100644 --- a/tests/action_email.py +++ b/tests/action_email.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. import os import re @@ -71,3 +71,7 @@ def whenEmailWithoutSubjectThenCheckContents(): assert toOK assert nbMsg == 1 os.remove(mail_filename) + +def unitTests(): + whenEmailWithSubjectThenCheckContents() + whenEmailWithoutSubjectThenCheckContents() diff --git a/tests/action_nftBan.py b/tests/action_nftBan.py index f2a933b..3ff5b2e 100644 --- a/tests/action_nftBan.py +++ b/tests/action_nftBan.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. import json import os @@ -137,3 +137,11 @@ def whenUnfinishedBanThenTimeoutReset(): assert line == "add element I4 ban {10.0.0.1 timeout 2s}\n", line assert lineCount == 3, lineCount _clean() + +def unitTests(): + whenBanIPv4ThenAddToIPv4Set() + whenBanIPv6ThenAddToIPv6Set() + whenBanTwoIPThenTwoLinesInState() + whenBanAnewThenNoDuplicate() + whenFinishedBanThenAsIfNotThere() + whenUnfinishedBanThenTimeoutReset() diff --git a/tests/filter_equals.py b/tests/filter_equals.py index 314c9b6..9d0f8e6 100644 --- a/tests/filter_equals.py +++ b/tests/filter_equals.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. from pyruse.filters.filter_equals import Filter @@ -14,3 +14,9 @@ def whenEqualDiffTypeThenTrue(): def whenLowerThenFalse(): assert not Filter({"field": "v", "value": 2}).filter({"v": 0}) + +def unitTests(): + whenGreaterThenFalse() + whenEqualSameTypeThenTrue() + whenEqualDiffTypeThenTrue() + whenLowerThenFalse() diff --git a/tests/filter_greaterOrEquals.py b/tests/filter_greaterOrEquals.py index 3610b28..bf150cc 100644 --- a/tests/filter_greaterOrEquals.py +++ b/tests/filter_greaterOrEquals.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. from pyruse.filters.filter_greaterOrEquals import Filter @@ -17,3 +17,10 @@ def whenEqualDiffTypeThenTrue(): def whenLowerThenFalse(): assert not Filter({"field": "v", "value": 2}).filter({"v": 0}) + +def unitTests(): + whenGreaterPosIntThenTrue() + whenGreaterNegFloatThenTrue() + whenEqualSameTypeThenTrue() + whenEqualDiffTypeThenTrue() + whenLowerThenFalse() diff --git a/tests/filter_pcre.py b/tests/filter_pcre.py index f49ea21..0190ddf 100644 --- a/tests/filter_pcre.py +++ b/tests/filter_pcre.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. from pyruse.filters.filter_pcre import Filter @@ -18,3 +18,9 @@ def whenNamedGroupsThenFoundInEntry(): entry = {"v": "yet another test"} Filter({"field": "v", "re": "^(?P.).* .*(?P.)r .*(?P.).$"}).filter(entry) assert entry["y"] + entry["e"] + entry["s"] == "yes" + +def unitTests(): + whenMatchesThenTrue() + whenNoMatchThenFalse() + whenSaveThenGroupsInEntry() + whenNamedGroupsThenFoundInEntry() diff --git a/tests/filter_pcreAny.py b/tests/filter_pcreAny.py index aec5485..3f80e75 100644 --- a/tests/filter_pcreAny.py +++ b/tests/filter_pcreAny.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. from pyruse.filters.filter_pcreAny import Filter @@ -13,3 +13,8 @@ def whenNamedGroupsThenFoundInEntry(): entry = {"v": "It works or not"} Filter({"field": "v", "re": ["^(?PIt)(?P works)", "(?Por)(?P not)$"]}).filter(entry) assert entry["o"] + entry["k"] == "It works" + +def unitTests(): + whenMatchesThenTrue() + whenNoMatchThenFalse() + whenNamedGroupsThenFoundInEntry() diff --git a/tests/filter_userExists.py b/tests/filter_userExists.py index d340d27..2abfc11 100644 --- a/tests/filter_userExists.py +++ b/tests/filter_userExists.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. from pyruse.filters.filter_userExists import Filter @@ -8,3 +8,7 @@ def whenUserExistsThenTrue(): def whenGarbageThenFalse(): assert not Filter({"field": "user"}).filter({"user": "auietsnr"}) + +def unitTests(): + whenUserExistsThenTrue() + whenGarbageThenFalse() diff --git a/tests/main.py b/tests/main.py index 5e56a63..8f8ba7a 100644 --- a/tests/main.py +++ b/tests/main.py @@ -1,5 +1,5 @@ # pyruse is intended as a replacement to both fail2ban and epylog -# Copyright © 2017 Y. Gablin +# Copyright © 2017–2018 Y. Gablin # Full licensing information in the LICENSE file, or gnu.org/licences/gpl-3.0.txt if the file is missing. import os import subprocess @@ -31,52 +31,16 @@ def main(): import filter_equals, filter_greaterOrEquals, filter_pcre, filter_pcreAny, filter_userExists import action_counterRaise, action_counterReset, action_dailyReport, action_email, action_nftBan - filter_equals.whenGreaterThenFalse() - filter_equals.whenEqualSameTypeThenTrue() - filter_equals.whenEqualDiffTypeThenTrue() - filter_equals.whenLowerThenFalse() - - filter_greaterOrEquals.whenGreaterPosIntThenTrue() - filter_greaterOrEquals.whenGreaterNegFloatThenTrue() - filter_greaterOrEquals.whenEqualSameTypeThenTrue() - filter_greaterOrEquals.whenEqualDiffTypeThenTrue() - filter_greaterOrEquals.whenLowerThenFalse() - - filter_pcre.whenMatchesThenTrue() - filter_pcre.whenNoMatchThenFalse() - filter_pcre.whenSaveThenGroupsInEntry() - filter_pcre.whenNamedGroupsThenFoundInEntry() - - filter_pcreAny.whenMatchesThenTrue() - filter_pcreAny.whenNoMatchThenFalse() - filter_pcreAny.whenNamedGroupsThenFoundInEntry() - - filter_userExists.whenUserExistsThenTrue() - filter_userExists.whenGarbageThenFalse() - - action_counterRaise.whenNonExistingThenRaiseTo1() - action_counterRaise.whenKeepSecondsThenRaiseUntilTimeOut() - action_counterRaise.whenDifferentKeyThenDifferentCounter() - action_counterRaise.whenGraceTimeThenCountIs0() - - action_counterReset.whenResetThenCountIs0() - action_counterReset.whenNoGraceTimeThenRaiseWorks() - action_counterReset.whenGraceTimeThenRaiseFails() - action_counterReset.whenGraceTimeThenRaiseWorksAtGraceEnd() - - action_dailyReport.whenNewDayThenReport() - action_dailyReport.whenEmailThenCheckContents() - action_dailyReport.whenReportThenNewSetOfMessages() - - action_email.whenEmailWithSubjectThenCheckContents() - action_email.whenEmailWithoutSubjectThenCheckContents() - - action_nftBan.whenBanIPv4ThenAddToIPv4Set() - action_nftBan.whenBanIPv6ThenAddToIPv6Set() - action_nftBan.whenBanTwoIPThenTwoLinesInState() - action_nftBan.whenBanAnewThenNoDuplicate() - action_nftBan.whenFinishedBanThenAsIfNotThere() - action_nftBan.whenUnfinishedBanThenTimeoutReset() + filter_equals.unitTests() + filter_greaterOrEquals.unitTests() + filter_pcre.unitTests() + filter_pcreAny.unitTests() + filter_userExists.unitTests() + action_counterRaise.unitTests() + action_counterReset.unitTests() + action_dailyReport.unitTests() + action_email.unitTests() + action_nftBan.unitTests() # Integration test wf = workflow.Workflow(conf.asMap().get("actions", {})) @@ -100,7 +64,7 @@ def main(): actions.action_dailyReport.Action._hour = 25 wf.run(entry("bck", "login", "Failed password for root from ::1", 11)) for f in ['acted_on.log', 'email.dump', 'nftBan.cmd', 'unfiltered.log']: - assert os.path.exists(f) + assert os.path.exists(f), "file should exist: " + f try: subprocess.run( [ "/usr/bin/bash",