-
-
Notifications
You must be signed in to change notification settings - Fork 85
[WIP] Openwrt firewall #162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
jonathanunderwood
wants to merge
45
commits into
master
Choose a base branch
from
openwrt-firewall
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
a89c7e9
[openwrt] Add firewall settings
okraits 8b7493e
Merge branch 'master' into openwrt-firewall
jonathanunderwood e49605e
Format /backends/openwrt/converters/__init__.py
jonathanunderwood 5cfddcc
Reformat openwrt/schema.py (openwisp-qa-format)
jonathanunderwood 29957e3
Establish OpenWRT firewall rule parser
jonathanunderwood 495b22a
Make firewall rule proto parameter a list
jonathanunderwood f792d6a
Add another firewall rule test
jonathanunderwood 6d09ae8
Add firewall zone handling and tests
jonathanunderwood 9c08c8c
Add parser and tests for firewall forwardings
jonathanunderwood a9cb284
Add OpenWRT firewall redirect handling
jonathanunderwood b9cf322
Fix redirect weekdays and monthdays handling
jonathanunderwood 3dd9f67
Enable negation when parsing monthdays and weekdays
jonathanunderwood 9aff7a1
Enhance and test redirect parser
jonathanunderwood b21ac81
Refactor __netjson_redirect() to reduce complexity
jonathanunderwood 986a49f
Refactor handling of proto parameter
jonathanunderwood c5a79ce
Refactor mac_address_regex usage in schema
jonathanunderwood 51a254f
Refactor firewall bool handling
jonathanunderwood f0aac95
Refactor the OpenWRT schema for ease of reading
jonathanunderwood 976ecea
Add more OpenWRT firewall zone parameters
jonathanunderwood ec27da2
Merge branch 'master' into openwrt-firewall
jonathanunderwood 0d82e0e
[openwrt] Fix formatting error
jonathanunderwood 5b3438c
[openwrt] Fix unnecessary quotation changes
jonathanunderwood 9f6c0fc
[openwrt] Remove debugging print statements
jonathanunderwood 4c134bc
[openwrt] Add firewall rules tests
jonathanunderwood 4aa5e9e
Merge branch 'master' into openwrt-firewall
jonathanunderwood e65f06c
[openwrt] Add firewall rule test
jonathanunderwood 84787b2
[openwrt] Enhance firewall uci rule parser
jonathanunderwood 89d80b0
[openwrt] Add firewall rule uci parsing test
jonathanunderwood 2bd9f08
[openwrt] Refactor firewall defaults schema
jonathanunderwood cd7186f
[openwrt] Add firewall defaults parser and renderer
jonathanunderwood 3413b94
[openwrt] Add firewall defaults tests
jonathanunderwood b4f18cf
[openwrt] Fix formatting in firewall.py
jonathanunderwood 657f03b
[openwrt] Fix formatting in test_firewall.py
jonathanunderwood fe2404a
[openwrt] Add more parameters to firewall defaults schema
jonathanunderwood bc7dc88
[openwrt] Fix title and description of firewall defaults
jonathanunderwood f15cf3c
[openwrt] Fix firewall defaults parser
jonathanunderwood 47e5530
Merge branch 'master' into openwrt-firewall
jonathanunderwood 006b196
[openwrt] Make name parameter required for firewall objects
jonathanunderwood 89a4bed
[openwrt] Test enabled parameter for firewall forwarding
jonathanunderwood 775707c
Merge branch 'master' into openwrt-firewall
jonathanunderwood 81b17a0
[openwrt] Fix test_default.py tests
jonathanunderwood 07305f6
[openwrt] Add firewall includes to schema
jonathanunderwood b2536dd
[openwrt] Add firewall includes parser and renderer
jonathanunderwood 3b51afe
[openwrt] Add firewall includes tests
jonathanunderwood 8c6ac92
[openwrt] Remove config_name handling in firewall
jonathanunderwood File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,312 @@ | ||
| """Firewall configuration management for OpenWRT. | ||
|
|
||
| See the following resource for a detailed description of the sections and parameters of | ||
| the UCI configuration for the OpenWRT firewall. | ||
|
|
||
| https://openwrt.org/docs/guide-user/firewall/firewall_configuration | ||
| """ | ||
| from collections import OrderedDict | ||
|
|
||
| from ..schema import schema | ||
| from .base import OpenWrtConverter | ||
|
|
||
|
|
||
| class Firewall(OpenWrtConverter): | ||
| netjson_key = "firewall" | ||
| intermediate_key = "firewall" | ||
| _uci_types = ["defaults", "forwarding", "zone", "rule", "redirect", "include"] | ||
| _schema = schema["properties"]["firewall"] | ||
|
|
||
| def to_intermediate_loop(self, block, result, index=None): | ||
| defaults = self.__intermediate_defaults(block.pop("defaults", {})) | ||
| forwardings = self.__intermediate_forwardings(block.pop("forwardings", {})) | ||
| zones = self.__intermediate_zones(block.pop("zones", {})) | ||
| rules = self.__intermediate_rules(block.pop("rules", {})) | ||
| redirects = self.__intermediate_redirects(block.pop("redirects", {})) | ||
| includes = self.__intermediate_includes(block.pop("includes", {})) | ||
| result.setdefault("firewall", []) | ||
| result["firewall"] = ( | ||
| defaults + forwardings + zones + rules + redirects + includes | ||
| ) | ||
| return result | ||
|
|
||
| def __intermediate_defaults(self, defaults): | ||
| """ | ||
| converts NetJSON defaults to | ||
| UCI intermediate data structure | ||
| """ | ||
| result = OrderedDict(((".name", "defaults"), (".type", "defaults"))) | ||
| result.update(defaults) | ||
| return [result] | ||
|
|
||
| def __intermediate_forwardings(self, forwardings): | ||
atb00ker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """ | ||
| converts NetJSON forwarding to | ||
| UCI intermediate data structure | ||
| """ | ||
| result = [] | ||
| for forwarding in forwardings: | ||
| resultdict = OrderedDict( | ||
| ( | ||
| (".name", self._get_uci_name(forwarding["name"])), | ||
| (".type", "forwarding"), | ||
| ) | ||
| ) | ||
| resultdict.update(forwarding) | ||
| result.append(resultdict) | ||
| return result | ||
|
|
||
| def __intermediate_zones(self, zones): | ||
| """ | ||
| converts NetJSON zone to | ||
| UCI intermediate data structure | ||
| """ | ||
| result = [] | ||
| for zone in zones: | ||
| resultdict = OrderedDict( | ||
| ((".name", self._get_uci_name(zone["name"])), (".type", "zone")) | ||
| ) | ||
| # If network contains only a single value, force the use of a UCI "option" | ||
| # rather than "list"". | ||
| network = zone["network"] | ||
| if len(network) == 1: | ||
| zone["network"] = network[0] | ||
| resultdict.update(zone) | ||
| result.append(resultdict) | ||
| return result | ||
|
|
||
| def __intermediate_rules(self, rules): | ||
| """ | ||
| converts NetJSON rule to | ||
| UCI intermediate data structure | ||
| """ | ||
| result = [] | ||
| for rule in rules: | ||
| resultdict = OrderedDict( | ||
| ((".name", self._get_uci_name(rule["name"])), (".type", "rule")) | ||
| ) | ||
| if "proto" in rule: | ||
| # If proto is a single value, then force it not to be in a list so that | ||
| # the UCI uses "option" rather than "list". If proto is only "tcp" | ||
| # and"udp", we can force it to the single special value of "tcpudp". | ||
| proto = rule["proto"] | ||
| if len(proto) == 1: | ||
| rule["proto"] = proto[0] | ||
| elif set(proto) == {"tcp", "udp"}: | ||
| rule["proto"] = "tcpudp" | ||
| resultdict.update(rule) | ||
| result.append(resultdict) | ||
| return result | ||
|
|
||
| def __intermediate_redirects(self, redirects): | ||
| """ | ||
| converts NetJSON redirect to | ||
| UCI intermediate data structure | ||
| """ | ||
| result = [] | ||
| for redirect in redirects: | ||
| resultdict = OrderedDict( | ||
| ( | ||
| (".name", self._get_uci_name(redirect["name"])), | ||
| (".type", "redirect"), | ||
| ) | ||
| ) | ||
| if "proto" in redirect: | ||
| # If proto is a single value, then force it not to be in a list so that | ||
| # the UCI uses "option" rather than "list". If proto is only "tcp" | ||
| # and"udp", we can force it to the single special value of "tcpudp". | ||
| proto = redirect["proto"] | ||
| if len(proto) == 1: | ||
| redirect["proto"] = proto[0] | ||
| elif set(proto) == {"tcp", "udp"}: | ||
| redirect["proto"] = "tcpudp" | ||
|
|
||
| resultdict.update(redirect) | ||
| result.append(resultdict) | ||
|
|
||
| return result | ||
|
|
||
| def __intermediate_includes(self, includes): | ||
| """ | ||
| converts NetJSON include to | ||
| UCI intermediate data structure | ||
| """ | ||
| result = [] | ||
| for include in includes: | ||
| resultdict = OrderedDict( | ||
| ((".name", self._get_uci_name(include["name"])), (".type", "include"),) | ||
| ) | ||
|
|
||
| resultdict.update(include) | ||
| result.append(resultdict) | ||
|
|
||
| return result | ||
|
|
||
| def to_netjson_loop(self, block, result, index): | ||
| result.setdefault("firewall", {}) | ||
|
|
||
| block.pop(".name") | ||
| _type = block.pop(".type") | ||
|
|
||
| if _type == "defaults": | ||
| defaults = self.__netjson_defaults(block) | ||
| if defaults: # note: default section can be empty | ||
| result["firewall"].setdefault("defaults", {}) | ||
| result["firewall"]["defaults"].update(defaults) | ||
| if _type == "rule": | ||
| rule = self.__netjson_rule(block) | ||
| result["firewall"].setdefault("rules", []) | ||
jonathanunderwood marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| result["firewall"]["rules"].append(rule) | ||
| if _type == "zone": | ||
| zone = self.__netjson_zone(block) | ||
| result["firewall"].setdefault("zones", []) | ||
| result["firewall"]["zones"].append(zone) | ||
| if _type == "forwarding": | ||
| forwarding = self.__netjson_forwarding(block) | ||
| result["firewall"].setdefault("forwardings", []) | ||
| result["firewall"]["forwardings"].append(forwarding) | ||
| if _type == "redirect": | ||
| redirect = self.__netjson_redirect(block) | ||
| result["firewall"].setdefault("redirects", []) | ||
| result["firewall"]["redirects"].append(redirect) | ||
| if _type == "include": | ||
| include = self.__netjson_include(block) | ||
| result["firewall"].setdefault("includes", []) | ||
| result["firewall"]["includes"].append(include) | ||
|
|
||
| return self.type_cast(result) | ||
|
|
||
| def __netjson_defaults(self, defaults): | ||
| for param in [ | ||
| "drop_invalid", | ||
| "synflood_protect", | ||
| "tcp_syncookies", | ||
| "tcp_ecn", | ||
| "tcp_window_scaling", | ||
| "accept_redirects", | ||
| "accept_source_route", | ||
| "custom_chains", | ||
| "disable_ipv6", | ||
| "flow_offloading", | ||
| "flow_offloading_hw", | ||
| "auto_helper", | ||
| ]: | ||
| if param in defaults: | ||
| defaults[param] = self.__netjson_generic_boolean(defaults[param]) | ||
| for param in ["synflood_limit", "synflood_burst"]: | ||
| if param in defaults: | ||
| defaults[param] = int(defaults[param]) | ||
| return self.type_cast(defaults) | ||
|
|
||
| def __netjson_rule(self, rule): | ||
| for param in ["enabled", "utc_time"]: | ||
| if param in rule: | ||
| rule[param] = self.__netjson_generic_boolean(rule[param]) | ||
|
|
||
| if "proto" in rule: | ||
| rule["proto"] = self.__netjson_generic_proto(rule["proto"]) | ||
|
|
||
| if "weekdays" in rule: | ||
| rule["weekdays"] = self.__netjson_generic_weekdays(rule["weekdays"]) | ||
|
|
||
| if "monthdays" in rule: | ||
| rule["monthdays"] = self.__netjson_generic_monthdays(rule["monthdays"]) | ||
|
|
||
| if "limit_burst" in rule: | ||
| rule["limit_burst"] = int(rule["limit_burst"]) | ||
|
|
||
| return self.type_cast(rule) | ||
|
|
||
| def __netjson_zone(self, zone): | ||
| network = zone["network"] | ||
| # network may be specified as a list in a single string e.g. | ||
| # option network 'wan wan6' | ||
| # Here we ensure that network is always a list. | ||
| if not isinstance(network, list): | ||
| zone["network"] = network.split() | ||
|
|
||
| for param in ["mtu_fix", "masq"]: | ||
| if param in zone: | ||
| zone[param] = self.__netjson_generic_boolean(zone[param]) | ||
|
|
||
| return self.type_cast(zone) | ||
|
|
||
| def __netjson_forwarding(self, forwarding): | ||
| if "enabled" in forwarding: | ||
| forwarding["enabled"] = self.__netjson_generic_boolean( | ||
| forwarding["enabled"] | ||
| ) | ||
| return self.type_cast(forwarding) | ||
|
|
||
| def __netjson_redirect(self, redirect): | ||
| if "proto" in redirect: | ||
| redirect["proto"] = self.__netjson_generic_proto(redirect["proto"]) | ||
|
|
||
| if "weekdays" in redirect: | ||
| redirect["weekdays"] = self.__netjson_generic_weekdays(redirect["weekdays"]) | ||
|
|
||
| if "monthdays" in redirect: | ||
| redirect["monthdays"] = self.__netjson_generic_monthdays( | ||
| redirect["monthdays"] | ||
| ) | ||
|
|
||
| for param in ["utc_time", "reflection", "enabled"]: | ||
| if param in redirect: | ||
| redirect[param] = self.__netjson_generic_boolean(redirect[param]) | ||
|
|
||
| if "limit_burst" in redirect: | ||
| redirect["limit_burst"] = int(redirect["limit_burst"]) | ||
|
|
||
| return self.type_cast(redirect) | ||
|
|
||
| def __netjson_include(self, include): | ||
| for param in ["reload", "enabled"]: | ||
| if param in include: | ||
| include[param] = self.__netjson_generic_boolean(include[param]) | ||
|
|
||
| return self.type_cast(include) | ||
|
|
||
| def __netjson_generic_boolean(self, boolean): | ||
| # Per convention, boolean options may have one of the values '0', 'no', 'off', | ||
| # 'false' or 'disabled' to specify a false value or '1' , 'yes', 'on', 'true' or | ||
| # 'enabled' to specify a true value. | ||
| # https://openwrt.org/docs/guide-user/base-system/uci | ||
| return boolean in ["1", "yes", "on", "true", "enabled"] | ||
|
|
||
| def __netjson_generic_proto(self, proto): | ||
| if isinstance(proto, list): | ||
| return proto.copy() | ||
| else: | ||
| if proto == "tcpudp": | ||
| return ["tcp", "udp"] | ||
| else: | ||
| return proto.split() | ||
|
|
||
| def __netjson_generic_weekdays(self, weekdays): | ||
| if not isinstance(weekdays, list): | ||
| wd = weekdays.split() | ||
| else: | ||
| wd = weekdays.copy() | ||
|
|
||
| # UCI allows the first entry to be "!" which means negate the remaining entries | ||
| if wd[0] == "!": | ||
| all_days = ["sun", "mon", "tue", "wed", "thu", "fri", "sat"] | ||
| wd = [day for day in all_days if day not in wd[1:]] | ||
|
|
||
| return wd | ||
|
|
||
| def __netjson_generic_monthdays(self, monthdays): | ||
| if not isinstance(monthdays, list): | ||
| md = monthdays.split() | ||
| else: | ||
| md = monthdays.copy() | ||
|
|
||
| # UCI allows the first entry to be "!" which means negate the remaining entries | ||
| if md[0] == "!": | ||
| md = [int(day) for day in md[1:]] | ||
| all_days = range(1, 32) | ||
| md = [day for day in all_days if day not in md] | ||
| else: | ||
| md = [int(day) for day in md] | ||
|
|
||
| return md | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.