diff --git a/netjsonconfig/backends/openwrt/converters/__init__.py b/netjsonconfig/backends/openwrt/converters/__init__.py index 2eb379a80..9e400a21d 100644 --- a/netjsonconfig/backends/openwrt/converters/__init__.py +++ b/netjsonconfig/backends/openwrt/converters/__init__.py @@ -1,4 +1,5 @@ from .default import Default +from .firewall import Firewall from .general import General from .interfaces import Interfaces from .led import Led @@ -22,4 +23,5 @@ 'Rules', 'Switch', 'Wireless', + 'Firewall', ] diff --git a/netjsonconfig/backends/openwrt/converters/firewall.py b/netjsonconfig/backends/openwrt/converters/firewall.py new file mode 100644 index 000000000..34c571741 --- /dev/null +++ b/netjsonconfig/backends/openwrt/converters/firewall.py @@ -0,0 +1,312 @@ +"""Firewall configuration management for OpenWRT. + +See the following resource for a detailed description of the sections and parameters of +the UCI configuration for the OpenWRT firewall. + + https://openwrt.org/docs/guide-user/firewall/firewall_configuration +""" +from collections import OrderedDict + +from ..schema import schema +from .base import OpenWrtConverter + + +class Firewall(OpenWrtConverter): + netjson_key = "firewall" + intermediate_key = "firewall" + _uci_types = ["defaults", "forwarding", "zone", "rule", "redirect", "include"] + _schema = schema["properties"]["firewall"] + + def to_intermediate_loop(self, block, result, index=None): + defaults = self.__intermediate_defaults(block.pop("defaults", {})) + forwardings = self.__intermediate_forwardings(block.pop("forwardings", {})) + zones = self.__intermediate_zones(block.pop("zones", {})) + rules = self.__intermediate_rules(block.pop("rules", {})) + redirects = self.__intermediate_redirects(block.pop("redirects", {})) + includes = self.__intermediate_includes(block.pop("includes", {})) + result.setdefault("firewall", []) + result["firewall"] = ( + defaults + forwardings + zones + rules + redirects + includes + ) + return result + + def __intermediate_defaults(self, defaults): + """ + converts NetJSON defaults to + UCI intermediate data structure + """ + result = OrderedDict(((".name", "defaults"), (".type", "defaults"))) + result.update(defaults) + return [result] + + def __intermediate_forwardings(self, forwardings): + """ + converts NetJSON forwarding to + UCI intermediate data structure + """ + result = [] + for forwarding in forwardings: + resultdict = OrderedDict( + ( + (".name", self._get_uci_name(forwarding["name"])), + (".type", "forwarding"), + ) + ) + resultdict.update(forwarding) + result.append(resultdict) + return result + + def __intermediate_zones(self, zones): + """ + converts NetJSON zone to + UCI intermediate data structure + """ + result = [] + for zone in zones: + resultdict = OrderedDict( + ((".name", self._get_uci_name(zone["name"])), (".type", "zone")) + ) + # If network contains only a single value, force the use of a UCI "option" + # rather than "list"". + network = zone["network"] + if len(network) == 1: + zone["network"] = network[0] + resultdict.update(zone) + result.append(resultdict) + return result + + def __intermediate_rules(self, rules): + """ + converts NetJSON rule to + UCI intermediate data structure + """ + result = [] + for rule in rules: + resultdict = OrderedDict( + ((".name", self._get_uci_name(rule["name"])), (".type", "rule")) + ) + if "proto" in rule: + # If proto is a single value, then force it not to be in a list so that + # the UCI uses "option" rather than "list". If proto is only "tcp" + # and"udp", we can force it to the single special value of "tcpudp". + proto = rule["proto"] + if len(proto) == 1: + rule["proto"] = proto[0] + elif set(proto) == {"tcp", "udp"}: + rule["proto"] = "tcpudp" + resultdict.update(rule) + result.append(resultdict) + return result + + def __intermediate_redirects(self, redirects): + """ + converts NetJSON redirect to + UCI intermediate data structure + """ + result = [] + for redirect in redirects: + resultdict = OrderedDict( + ( + (".name", self._get_uci_name(redirect["name"])), + (".type", "redirect"), + ) + ) + if "proto" in redirect: + # If proto is a single value, then force it not to be in a list so that + # the UCI uses "option" rather than "list". If proto is only "tcp" + # and"udp", we can force it to the single special value of "tcpudp". + proto = redirect["proto"] + if len(proto) == 1: + redirect["proto"] = proto[0] + elif set(proto) == {"tcp", "udp"}: + redirect["proto"] = "tcpudp" + + resultdict.update(redirect) + result.append(resultdict) + + return result + + def __intermediate_includes(self, includes): + """ + converts NetJSON include to + UCI intermediate data structure + """ + result = [] + for include in includes: + resultdict = OrderedDict( + ((".name", self._get_uci_name(include["name"])), (".type", "include"),) + ) + + resultdict.update(include) + result.append(resultdict) + + return result + + def to_netjson_loop(self, block, result, index): + result.setdefault("firewall", {}) + + block.pop(".name") + _type = block.pop(".type") + + if _type == "defaults": + defaults = self.__netjson_defaults(block) + if defaults: # note: default section can be empty + result["firewall"].setdefault("defaults", {}) + result["firewall"]["defaults"].update(defaults) + if _type == "rule": + rule = self.__netjson_rule(block) + result["firewall"].setdefault("rules", []) + result["firewall"]["rules"].append(rule) + if _type == "zone": + zone = self.__netjson_zone(block) + result["firewall"].setdefault("zones", []) + result["firewall"]["zones"].append(zone) + if _type == "forwarding": + forwarding = self.__netjson_forwarding(block) + result["firewall"].setdefault("forwardings", []) + result["firewall"]["forwardings"].append(forwarding) + if _type == "redirect": + redirect = self.__netjson_redirect(block) + result["firewall"].setdefault("redirects", []) + result["firewall"]["redirects"].append(redirect) + if _type == "include": + include = self.__netjson_include(block) + result["firewall"].setdefault("includes", []) + result["firewall"]["includes"].append(include) + + return self.type_cast(result) + + def __netjson_defaults(self, defaults): + for param in [ + "drop_invalid", + "synflood_protect", + "tcp_syncookies", + "tcp_ecn", + "tcp_window_scaling", + "accept_redirects", + "accept_source_route", + "custom_chains", + "disable_ipv6", + "flow_offloading", + "flow_offloading_hw", + "auto_helper", + ]: + if param in defaults: + defaults[param] = self.__netjson_generic_boolean(defaults[param]) + for param in ["synflood_limit", "synflood_burst"]: + if param in defaults: + defaults[param] = int(defaults[param]) + return self.type_cast(defaults) + + def __netjson_rule(self, rule): + for param in ["enabled", "utc_time"]: + if param in rule: + rule[param] = self.__netjson_generic_boolean(rule[param]) + + if "proto" in rule: + rule["proto"] = self.__netjson_generic_proto(rule["proto"]) + + if "weekdays" in rule: + rule["weekdays"] = self.__netjson_generic_weekdays(rule["weekdays"]) + + if "monthdays" in rule: + rule["monthdays"] = self.__netjson_generic_monthdays(rule["monthdays"]) + + if "limit_burst" in rule: + rule["limit_burst"] = int(rule["limit_burst"]) + + return self.type_cast(rule) + + def __netjson_zone(self, zone): + network = zone["network"] + # network may be specified as a list in a single string e.g. + # option network 'wan wan6' + # Here we ensure that network is always a list. + if not isinstance(network, list): + zone["network"] = network.split() + + for param in ["mtu_fix", "masq"]: + if param in zone: + zone[param] = self.__netjson_generic_boolean(zone[param]) + + return self.type_cast(zone) + + def __netjson_forwarding(self, forwarding): + if "enabled" in forwarding: + forwarding["enabled"] = self.__netjson_generic_boolean( + forwarding["enabled"] + ) + return self.type_cast(forwarding) + + def __netjson_redirect(self, redirect): + if "proto" in redirect: + redirect["proto"] = self.__netjson_generic_proto(redirect["proto"]) + + if "weekdays" in redirect: + redirect["weekdays"] = self.__netjson_generic_weekdays(redirect["weekdays"]) + + if "monthdays" in redirect: + redirect["monthdays"] = self.__netjson_generic_monthdays( + redirect["monthdays"] + ) + + for param in ["utc_time", "reflection", "enabled"]: + if param in redirect: + redirect[param] = self.__netjson_generic_boolean(redirect[param]) + + if "limit_burst" in redirect: + redirect["limit_burst"] = int(redirect["limit_burst"]) + + return self.type_cast(redirect) + + def __netjson_include(self, include): + for param in ["reload", "enabled"]: + if param in include: + include[param] = self.__netjson_generic_boolean(include[param]) + + return self.type_cast(include) + + def __netjson_generic_boolean(self, boolean): + # Per convention, boolean options may have one of the values '0', 'no', 'off', + # 'false' or 'disabled' to specify a false value or '1' , 'yes', 'on', 'true' or + # 'enabled' to specify a true value. + # https://openwrt.org/docs/guide-user/base-system/uci + return boolean in ["1", "yes", "on", "true", "enabled"] + + def __netjson_generic_proto(self, proto): + if isinstance(proto, list): + return proto.copy() + else: + if proto == "tcpudp": + return ["tcp", "udp"] + else: + return proto.split() + + def __netjson_generic_weekdays(self, weekdays): + if not isinstance(weekdays, list): + wd = weekdays.split() + else: + wd = weekdays.copy() + + # UCI allows the first entry to be "!" which means negate the remaining entries + if wd[0] == "!": + all_days = ["sun", "mon", "tue", "wed", "thu", "fri", "sat"] + wd = [day for day in all_days if day not in wd[1:]] + + return wd + + def __netjson_generic_monthdays(self, monthdays): + if not isinstance(monthdays, list): + md = monthdays.split() + else: + md = monthdays.copy() + + # UCI allows the first entry to be "!" which means negate the remaining entries + if md[0] == "!": + md = [int(day) for day in md[1:]] + all_days = range(1, 32) + md = [day for day in all_days if day not in md] + else: + md = [int(day) for day in md] + + return md diff --git a/netjsonconfig/backends/openwrt/openwrt.py b/netjsonconfig/backends/openwrt/openwrt.py index b6dc64934..54aa28a0b 100644 --- a/netjsonconfig/backends/openwrt/openwrt.py +++ b/netjsonconfig/backends/openwrt/openwrt.py @@ -22,6 +22,7 @@ class OpenWrt(BaseBackend): converters.Radios, converters.Wireless, converters.OpenVpn, + converters.Firewall, converters.Default, ] parser = OpenWrtParser diff --git a/netjsonconfig/backends/openwrt/schema.py b/netjsonconfig/backends/openwrt/schema.py index 748a821b8..8772590fa 100644 --- a/netjsonconfig/backends/openwrt/schema.py +++ b/netjsonconfig/backends/openwrt/schema.py @@ -11,12 +11,37 @@ "properties" ] +# The following regex will match against a single valid port, or a port range e.g. 1234-5000 +port_range_regex = "^([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])(-([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5]))?$" # noqa + +# Match against a MAC address +mac_address_regex = "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" + +# Match against a yyyy-mm-dd format date. Note that draft07 of the JSON schema standard +# include a "date" pattern which can replace this. +# https://json-schema.org/understanding-json-schema/reference/string.html +date_regex = "^([0-9]{4})-(0[1-9]|[12][0-9]|3[01])-([012][0-9]|[3][01])$" + +# Match against a time in the format hh:mm:ss +time_regex = "^([01][0-9]|2[0123])(:([012345][0-9])){2}$" + +# Match against a range of IPv4 addresses +ipv4_cidr_regex = "^([0-9]{1,3}.){3}[0-9]{1,3}(/([0-9]|[1-2][0-9]|3[0-2]))?$" + +# Match against a negatable range of IPv4 addresses. This variant allows for an optional +# "!" in front of the CIDR. +ipv4_negatable_cidr_regex = ( + "^!?([0-9]{1,3}.){3}[0-9]{1,3}(/([0-9]|[1-2][0-9]|3[0-2]))?$" +) + +# Match against a range of IPv6 addresses +ipv6_cidr_regex = "^s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:)))(%.+)?s*(/([0-9]|[1-9][0-9]|1[0-1][0-9]|12[0-8]))?$" # noqa schema = merge_config( default_schema, { "definitions": { - "interface_settings": { + "interface_settings": { # Overrides default schema "properties": { "network": { "type": "string", @@ -28,14 +53,14 @@ } } }, - "wireless_interface": { + "wireless_interface": { # Overrides default schema "properties": { "wireless": { "properties": { "network": { "type": "array", "title": "Attached Networks", - "description": "override OpenWRT \"network\" config option of of wifi-iface " + "description": 'override OpenWRT "network" config option of of wifi-iface ' "directive; will be automatically determined if left blank", "uniqueItems": True, "additionalItems": True, @@ -50,7 +75,7 @@ } } }, - "ap_wireless_settings": { + "ap_wireless_settings": { # Overrides default schema "allOf": [ { "properties": { @@ -74,9 +99,9 @@ "macfilter": { "type": "string", "title": "MAC Filter", - "description": "specifies the mac filter policy, \"disable\" to disable " - "the filter, \"allow\" to treat it as whitelist or " - "\"deny\" to treat it as blacklist", + "description": 'specifies the mac filter policy, "disable" to disable ' + 'the filter, "allow" to treat it as whitelist or ' + '"deny" to treat it as blacklist', "enum": ["disable", "allow", "deny"], "default": "disable", "propertyOrder": 15, @@ -85,12 +110,12 @@ "type": "array", "title": "MAC List", "description": "mac addresses that will be filtered according to the policy " - "specified in the \"macfilter\" option", + 'specified in the "macfilter" option', "propertyOrder": 16, "items": { "type": "string", "title": "MAC address", - "pattern": "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", + "pattern": mac_address_regex, "minLength": 17, "maxLength": 17, }, @@ -99,14 +124,14 @@ } ] }, - "bridge_interface": { + "bridge_interface": { # Overrides default schema "allOf": [ { "properties": { "igmp_snooping": { "type": "boolean", "title": "IGMP snooping", - "description": "sets the \"multicast_snooping\" kernel setting for a bridge", + "description": 'sets the "multicast_snooping" kernel setting for a bridge', "default": True, "format": "checkbox", "propertyOrder": 4, @@ -241,16 +266,16 @@ } } }, - "radio_80211gn_settings": { + "radio_80211gn_settings": { # Overrides default schema "allOf": [{"$ref": "#/definitions/radio_hwmode_11g"}] }, - "radio_80211an_settings": { + "radio_80211an_settings": { # Overrides default schema "allOf": [{"$ref": "#/definitions/radio_hwmode_11a"}] }, - "radio_80211ac_2ghz_settings": { + "radio_80211ac_2ghz_settings": { # Overrides default schema "allOf": [{"$ref": "#/definitions/radio_hwmode_11g"}] }, - "radio_80211ac_5ghz_settings": { + "radio_80211ac_5ghz_settings": { # Overrides default schema "allOf": [{"$ref": "#/definitions/radio_hwmode_11a"}] }, }, @@ -493,6 +518,777 @@ }, ) +firewall_definitions = { + "firewall": { + "name": { + "type": "string", + "title": "name", + "description": "Name of redirect", + "propertyOrder": 1, + }, + "enabled": { + "type": "boolean", + "title": "enable", + "description": "Enable this configuration entity.", + "default": True, + "format": "checkbox", + "propertyOrder": 2, + }, + "zone_name": { + "type": "string", + "title": "A Zone name", + "description": "A unique zone name. Has a maximum" + "length of 11 characters.", + "maxLength": 11, + "propertyOrder": 3, + }, + "ipv4_negatable_cidr": { + "type": "string", + "pattern": ipv4_negatable_cidr_regex, + }, + "src": { + "type": "string", + "title": "src", + "description": "Specifies the traffic source zone. " + "Must refer to one of the defined zone names. " + "For typical port forwards this usually is wan.", + "maxLength": 11, + "propertyOrder": 4, + }, + "src_ip": { + "type": "string", + "title": "src_ip", + "description": "Match incoming traffic from the specified source ip " + "address.", + "propertyOrder": 5, + }, + "src_mac": { + "type": "string", + "title": "src_mac", + "description": "Match incoming traffic from the specified MAC address.", + "pattern": mac_address_regex, + "minLength": 17, + "maxLength": 17, + "propertyOrder": 6, + }, + "src_port": { + "type": "string", + "title": "src_port", + "description": "Match incoming traffic originating from the given source " + "port or port range on the client host.", + "pattern": port_range_regex, + "propertyOrder": 7, + }, + "proto": { + "type": "array", + "title": "proto", + "description": "Match incoming traffic using the given protocol. " + "Can be one of tcp, udp, tcpudp, udplite, icmp, esp, " + "ah, sctp, or all or it can be a numeric value, " + "representing one of these protocols or a different one. " + "A protocol name from /etc/protocols is also allowed. " + "The number 0 is equivalent to all", + "default": ["tcp", "udp"], + "propertyOrder": 8, + "items": {"title": "Protocol type", "type": "string"}, + }, + "dest": { + "type": "string", + "title": "dest", + "description": "Specifies the traffic destination zone. Must refer to " + "on of the defined zone names. For DNAT target on Attitude Adjustment, " + 'NAT reflection works only if this is equal to "lan".', + "maxLength": 11, + "propertyOrder": 9, + }, + "dest_ip": { + "type": "string", + "title": "dest_ip", + "description": "For DNAT, redirect matches incoming traffic to the " + "specified internal host. For SNAT, it matches traffic directed at " + "the given address. For DNAT, if the dest_ip is not specified, the rule " + "is translated in a iptables/REDIRECT rule, otherwise it is a " + "iptables/DNAT rule.", + # "format": "ipv4", + "propertyOrder": 10, + }, + "dest_port": { + "type": "string", + "title": "dest_port", + "description": "For DNAT, redirect matched incoming traffic to the given " + "port on the internal host. For SNAT, match traffic directed at the " + "given ports. Only a single port or range can be specified.", + "pattern": port_range_regex, + "propertyOrder": 11, + }, + "ipset": { + "type": "string", + "title": "ipset", + "description": "Match traffic against the given ipset. The match can be " + "inverted by prefixing the value with an exclamation mark.", + "propertyOrder": 12, + }, + "mark": { + "type": "string", + "title": "mark", + "description": 'Match traffic against the given firewall mark, e.g. ' + '"0xFF" to match mark 255 or "0x0/0x1" to match any even mark value. ' + 'The match can be inverted by prefixing the value with an exclamation ' + 'mark, e.g. "!0x10" to match all but mark #16.', + "propertyOrder": 13, + }, + "start_date": { + "type": "string", + "title": "start_date", + "description": "Only match traffic after the given date (inclusive).", + "pattern": date_regex, + # "format": "date", TODO: replace pattern with this + # when adopt draft07 + "propertyOrder": 14, + }, + "stop_date": { + "type": "string", + "title": "stop_date", + "description": "Only match traffic before the given date (inclusive).", + "pattern": date_regex, + # "format": "date", TODO: replace pattern with this + # when adopt draft07 + "propertyOrder": 15, + }, + "start_time": { + "type": "string", + "title": "start_time", + "description": "Only match traffic after the given time of day " + "(inclusive).", + "pattern": time_regex, + "propertyOrder": 16, + }, + "stop_time": { + "type": "string", + "title": "stop_time", + "description": "Only match traffic before the given time of day " + "(inclusive).", + "pattern": time_regex, + "propertyOrder": 17, + }, + # Note: here we don't support negation of values like + # the UCI syntax does, as it's not necessary. + "weekdays": { + "type": "array", + "title": "weekdays", + "description": "Only match traffic during the given week days, " + 'e.g. ["sun", "mon", "thu", "fri"] to only match on Sundays, ' + "Mondays, Thursdays and Fridays.", + "propertyOrder": 18, + "items": { + "type": "string", + "title": "weekday", + "enum": ["mon", "tue", "wed", "thu", "fri", "sat", "sun"], + }, + }, + # Note: here we don't support negation of values like + # the UCI syntax does, as it's not necessary. + "monthdays": { + "type": "array", + "title": "monthdays", + "description": "Only match traffic during the given days of the " + "month, e.g. [2, 5, 30] to only match on every 2nd, 5th and 30th " + "day of the month.", + "propertyOrder": 19, + "items": { + "type": "integer", + "title": "day of month", + "minimum": 1, + "maximum": 31, + }, + }, + "utc_time": { + "type": "boolean", + "title": "utc_time", + "description": "Treat all given time values as UTC time instead of local " + "time.", + "default": False, + "propertyOrder": 20, + }, + "family": { + "type": "string", + "title": "family", + "description": "Protocol family (ipv4, ipv6 or any) to generate iptables " + "rules for", + "enum": ["ipv4", "ipv6", "any"], + "default": "any", + "propertyOrder": 22, + }, + "limit": { + "type": "string", + "title": "limit", + "description": "Maximum average matching rate; specified as a number, " + "with an optional /second, /minute, /hour or /day suffix. " + "Examples: 3/second, 3/sec or 3/s.", + "propertyOrder": 25, + }, + "limit_burst": { + "type": "integer", + "title": "limit_burst", + "description": "Maximum initial number of packets to match, allowing a " + "short-term average above limit.", + "default": 5, + "propertyOrder": 26, + }, + "firewall_policy": { + "type": "string", + "enum": ["ACCEPT", "REJECT", "DROP"], + "options": {"enum_titles": ["Accept", "Reject", "Drop"]}, + "default": "REJECT", + }, + "zone_policy": { + "type": "string", + "enum": ["ACCEPT", "REJECT", "DROP"], + "options": {"enum_titles": ["Accept", "Reject", "Drop"]}, + "default": "DROP", + }, + "rule_policy": { + "type": "string", + "enum": ["ACCEPT", "REJECT", "DROP", "MARK", "NOTRACK"], + "options": {"enum_titles": ["Accept", "Reject", "Drop", "Mark", "Notrack"]}, + "default": "DROP", + }, + }, +} + +firewall_includes_properties = { + "name": {"$ref": "#/definitions/firewall/name"}, + "enabled": {"$ref": "#/definitions/firewall/enabled"}, + "family": {"$ref": "#/definitions/firewall/family"}, + "type": { + "type": "string", + "title": "The type of the script", + "description": 'Specifies the type of the include, can be "script" for traditional ' + 'shell script includes or restore for plain files in iptables-restore format.', + "enum": ["script", "restore"], + "propertyOrder": 101, + }, + "path": { + "type": "string", + "title": "Script to include", + "description": "Specifies a shell script to execute on boot or firewall restarts", + "default": "/etc/firewall.user", + "propertyOrder": 102, + }, + "reload": { + "type": "boolean", + "title": "Reload the included file when reloading firewall rules", + "description": "This specifies whether or not the included file should be " + "reloaded when the firewall rules are reloaded. This is only needed if " + "the included file injects rules into internal OpenWRT chains.", + "default": False, + "propertyOrder": 103, + }, +} + +firewall_redirect_properties = { + "name": {"$ref": "#/definitions/firewall/name"}, + "enabled": {"$ref": "#/definitions/firewall/enabled"}, + "src": {"$ref": "#/definitions/firewall/src"}, + "src_ip": { + "allOf": [ + {"$ref": "#/definitions/firewall/src_ip"}, + {"pattern": ipv4_cidr_regex}, + ], + }, + "src_mac": {"$ref": "#/definitions/firewall/src_mac"}, + "src_port": {"$ref": "#/definitions/firewall/src_port"}, + "proto": {"$ref": "#/definitions/firewall/proto"}, + "dest": {"$ref": "#/definitions/firewall/dest"}, + "dest_ip": {"$ref": "#/definitions/firewall/dest_ip"}, + "dest_port": {"$ref": "#/definitions/firewall/dest_port"}, + "ipset": {"$ref": "#/definitions/firewall/ipset"}, + "mark": {"$ref": "#/definitions/firewall/mark"}, + "start_date": {"$ref": "#/definitions/firewall/start_date"}, + "stop_date": {"$ref": "#/definitions/firewall/stop_date"}, + "start_time": {"$ref": "#/definitions/firewall/start_time"}, + "stop_time": {"$ref": "#/definitions/firewall/stop_time"}, + "weekdays": {"$ref": "#/definitions/firewall/weekdays"}, + "monthdays": {"$ref": "#/definitions/firewall/monthdays"}, + "utc_time": {"$ref": "#/definitions/firewall/utc_time"}, + "family": {"$ref": "#/definitions/firewall/family"}, + "limit": {"$ref": "#/definitions/firewall/limit"}, + "limit_burst": {"$ref": "#/definitions/firewall/limit_burst"}, + "src_dip": { + "type": "string", + "title": "src_dip", + "description": "For DNAT, match incoming traffic directed at the " + "given destination ip address. For SNAT rewrite the source address " + "to the given address.", + "format": "ipv4", + "propertyOrder": 101, + }, + "src_dport": { + "type": "string", + "title": "src_dport", + "description": "For DNAT, match incoming traffic directed at the given " + "destination port or port range on this host. For SNAT rewrite the " + "source ports to the given value.", + "pattern": port_range_regex, + "propertyOrder": 102, + }, + "reflection": { + "type": "boolean", + "title": "reflection", + "description": "Activate NAT reflection for this redirect. Applicable to " + "DNAT targets.", + "default": True, + "propertyOrder": 103, + }, + "reflection_src": { + "type": "string", + "title": "reflection_src", + "description": "The source address to use for NAT-reflected packets if " + "reflection is True. This can be internal or external, specifying which " + "interface’s address to use. Applicable to DNAT targets.", + "enum": ["internal", "external"], + "default": "internal", + "propertyOrder": 104, + }, + "target": { + "type": "string", + "title": "target", + "description": "NAT target (DNAT or SNAT) to use when generating the rule.", + "enum": ["DNAT", "SNAT"], + "default": "DNAT", + "propertyOrder": 105, + }, +} + +firewall_rules_properties = { + "name": {"$ref": "#/definitions/firewall/name"}, + "enabled": {"$ref": "#/definitions/firewall/enabled"}, + "src": {"$ref": "#/definitions/firewall/src"}, + "src_ip": { + "allOf": [ + {"$ref": "#/definitions/firewall/src_ip"}, + {"oneOf": [{"pattern": ipv4_cidr_regex}, {"pattern": ipv6_cidr_regex}]}, + ], + }, + "src_mac": {"$ref": "#/definitions/firewall/src_mac"}, + "src_port": {"$ref": "#/definitions/firewall/src_port"}, + "proto": {"$ref": "#/definitions/firewall/proto"}, + "dest": {"$ref": "#/definitions/firewall/dest"}, + "dest_ip": {"$ref": "#/definitions/firewall/dest_ip"}, + "dest_port": {"$ref": "#/definitions/firewall/dest_port"}, + "ipset": {"$ref": "#/definitions/firewall/ipset"}, + "mark": {"$ref": "#/definitions/firewall/mark"}, + "start_date": {"$ref": "#/definitions/firewall/start_date"}, + "stop_date": {"$ref": "#/definitions/firewall/stop_date"}, + "start_time": {"$ref": "#/definitions/firewall/start_time"}, + "stop_time": {"$ref": "#/definitions/firewall/stop_time"}, + "weekdays": {"$ref": "#/definitions/firewall/weekdays"}, + "monthdays": {"$ref": "#/definitions/firewall/monthdays"}, + "utc_time": {"$ref": "#/definitions/firewall/utc_time"}, + "family": {"$ref": "#/definitions/firewall/family"}, + "limit": {"$ref": "#/definitions/firewall/limit"}, + "limit_burst": {"$ref": "#/definitions/firewall/limit_burst"}, + "icmp_type": { + "title": "icmp_type", + "description": "For protocol icmp select specific icmp types to match. " + "Values can be either exact icmp type numbers or type names.", + "type": "array", + "uniqueItems": True, + "additionalItems": True, + "propertyOrder": 101, + "items": {"title": "ICMP type", "type": "string"}, + }, + "target": { + "allOf": [ + {"$ref": "#/definitions/firewall/rule_policy"}, + { + "title": "target", + "description": "firewall action for matched traffic", + "propertyOrder": 11, + }, + ] + }, +} + +firewall_forwardings_properties = { + "name": {"$ref": "#/definitions/firewall/name"}, + "enabled": {"$ref": "#/definitions/firewall/enabled"}, + "src": {"$ref": "#/definitions/firewall/src"}, + "dest": {"$ref": "#/definitions/firewall/dest"}, + "family": {"$ref": "#/definitions/firewall/family"}, +} + +# Note: this is currently incomplete and needs other properties adding +# https://openwrt.org/docs/guide-user/firewall/firewall_configuration#zones +firewall_zones_properties = { + "name": {"$ref": "#/definitions/firewall/zone_name"}, + "enabled": {"$ref": "#/definitions/firewall/enabled"}, + "network": { + "type": "array", + "title": "Network", + "description": "List of interfaces attached to this zone.", + "uniqueItems": True, + "propertyOrder": 2, + "items": { + "title": "Network", + "type": "string", + "maxLength": 15, + "pattern": "^[a-zA-z0-9_\\.\\-]*$", + }, + }, + "masq": { + "type": "boolean", + "title": "masq", + "description": "Specifies whether outgoing zone traffic should be " + "masqueraded.", + "default": False, + "format": "checkbox", + "propertyOrder": 3, + }, + "mtu_fix": { + "type": "boolean", + "title": "mtu_fix", + "description": "Enable MSS clamping for outgoing zone traffic.", + "default": False, + "format": "checkbox", + "propertyOrder": 4, + }, + "input": { + "allOf": [ + {"$ref": "#/definitions/firewall/zone_policy"}, + { + "title": "Input policy", + "description": "Default policy for incoming zone traffic.", + "propertyOrder": 5, + }, + ] + }, + "output": { + "allOf": [ + {"$ref": "#/definitions/firewall/zone_policy"}, + { + "title": "Output policy", + "description": "Default policy for outgoing zone traffic.", + "propertyOrder": 6, + }, + ] + }, + "forward": { + "allOf": [ + {"$ref": "#/definitions/firewall/zone_policy"}, + { + "title": "Forward policy.", + "description": "Default policy for forwarded zone traffic.", + "propertyOrder": 7, + }, + ] + }, + "masq_src": { + "type": "array", + "title": "Masqueraded source CIDR list.", + "description": "List of source IPv4 CIDRs that require masquerading.", + "propertyOrder": 8, + "items": { + "allOf": [ + {"$ref": "#/definitions/firewall/ipv4_cidr"}, + { + "title": "Masqueraded source CIDR.", + "description": "Source CIDR to enable masquerading for. " + 'Negation is possible by prefixing the subnet with a "!". ', + }, + ], + }, + }, + "masq_dest": { + "type": "array", + "title": "Masqueraded destination CIDR list.", + "description": "List of destination IPv4 CIDRs that require masquerading.", + "propertyOrder": 9, + "items": { + "allOf": [ + {"$ref": "#/definitions/firewall/ipv4_cidr"}, + { + "title": "Masquerade destination CIDR.", + "description": "Destination CIDR to enable masquerading for. " + 'Negation is possible by prefixing the subnet with a "!". ', + }, + ], + }, + }, + "masq_allow_invalid": { + "type": "boolean", + "title": "Allow invalid packets.", + "description": "Do not add DROP INVALID rules to the firewall if masquerading " + "is used. The DROP rules are supposed to prevent NAT leakage.", + "default": False, + "format": "checkbox", + "propertyOrder": 10, + }, + "family": {"$ref": "#/definitions/firewall/family"}, + "log": { + "type": "integer", + "title": "Enable logging for the filter and/or mangle table.", + "description": "Bit field to enable logging in the filter and/or mangle tables, " + "bit 0 = filter, bit 1 = mangle.", + "min": 0, + "max": 3, + "default": 0, + "propertyOrder": 10, + }, + "log_limit": { + "type": "string", + "title": "Limit on the number of log messages.", + "description": "Limits the amount of log messages per interval. For example, " + '"10/minute" will limit the logging to 10 messages per minute', + "default": "10/minute", + "propertyOrder": 11, + }, + "device": { + "type": "array", + "title": "Raw devices to attach to this zone.", + "description": "A list of raw device names to associate with this zone. ", + "items": { + "type": "string", + "title": "A device to attach to the zone.", + "description": "A device to attach to the zone." + 'For example, "ppp+" to match any PPP interface to the zone.', + }, + "propertyOrder": 12, + }, +} + +firewall_defaults = { + "input": { + "allOf": [ + {"$ref": "#/definitions/firewall/firewall_policy"}, + { + "title": "Default input policy", + "description": "Default policy for the INPUT chain of the filter table", + "propertyOrder": 1, + }, + ] + }, + "output": { + "allOf": [ + {"$ref": "#/definitions/firewall/firewall_policy"}, + { + "title": "Default output policy", + "description": "Default policy for the OUTPUT chain of the filter table", + "propertyOrder": 2, + }, + ] + }, + "forward": { + "allOf": [ + {"$ref": "#/definitions/firewall/firewall_policy"}, + { + "title": "Default forward policy", + "description": "Defulat policy for the FORWARD chain of the filter table", + "propertyOrder": 3, + }, + ] + }, + "drop_invalid": { + "type": "boolean", + "title": "Drop invalid packets.", + "description": "If True then any invalid packets will be dropped.", + "default": False, + "format": "checkbox", + "propertyOrder": 4, + }, + "synflood_protect": { + "type": "boolean", + "title": "Enable SYN flood protection.", + "description": "Enables SYN flood protection.", + "default": False, + "format": "checkbox", + "propertyOrder": 5, + }, + "synflood_rate": { + "type": "integer", + "title": "Rate limit (packets/second) for SYN packets above which the traffic is considered a flood.", + "description": "Number of packets/second for SYN packets above which the traffic is considered a " + "flood.", + "default": 25, + "propertyOrder": 6, + }, + "synflood_burst": { + "type": "integer", + "title": "Burst limit (packets/second) for SYN packets above which the traffic is considered a " + "flood.", + "description": "Set burst limit for SYN packets above which the traffic is considered a flood if it " + "exceeds the allowed rate.", + "default": 50, + "propertyOrder": 7, + }, + "tcp_syncookies": { + "type": "boolean", + "title": "Enable the use of TCP SYN cookies.", + "description": "If True, enables the use of SYN cookies.", + "default": True, + "format": "checkbox", + "propertyOrder": 8, + }, + "tcp_ecn": { + "type": "boolean", + "title": "Enable Explicit Congestion Notification.", + "description": "If True, enables Explicit Congestion Notification.", + "default": False, + "format": "checkbox", + "propertyOrder": 9, + }, + "tcp_window_scaling": { + "type": "boolean", + "title": "Enable TCP window scaling.", + "description": "If True, enables TCP window scaling.", + "default": True, + "format": "checkbox", + "propertyOrder": 10, + }, + "accept_redirects": { + "type": "boolean", + "title": "Accept redirects.", + "description": "If True, accept redirects.", + "default": False, + "format": "checkbox", + "propertyOrder": 11, + }, + "accept_source_route": { + "type": "boolean", + "title": "Accept source routes.", + "description": "If True, accept source routes.", + "default": False, + "format": "checkbox", + "propertyOrder": 12, + }, + "custom_chains": { + "type": "boolean", + "title": "Enable generation of custom rule chain hooks for user generated rules.", + "description": "If True, enable generation of custom rule chain hooks for user generated rules. " + "User rules would be typically stored in firewall.user but some packages e.g. BCP38 also make use " + "of these hooks.", + "default": True, + "format": "checkbox", + "propertyOrder": 13, + }, + "disable_ipv6": { + "type": "boolean", + "title": "Disable IPv6 firewall rules.", + "description": "If True, disable IPv6 firewall rules.", + "default": False, + "format": "checkbox", + "propertyOrder": 14, + }, + "flow_offlocaing": { + "type": "boolean", + "title": "Enable software flow offloading for connections.", + "description": "If True, enable software flow offloading for connections.", + "default": False, + "format": "checkbox", + "propertyOrder": 15, + }, + "flow_offlocaing_hw": { + "type": "boolean", + "title": "Enable hardware flow offloading for connections.", + "description": "If True, enable hardware flow offloading for connections.", + "default": False, + "format": "checkbox", + "propertyOrder": 16, + }, + "auto_helper": { + "type": "boolean", + "title": "Enable Conntrack helpers ", + "description": "If True, enable Conntrack helpers ", + "default": True, + "format": "checkbox", + "propertyOrder": 17, + }, +} + +firewall_properties = { + "defaults": { + "type": "object", + "title": "Firewall defaults", + "description": "Defaults for the firewall", + "propertyOrder": 4, + "properties": firewall_defaults, + }, + "forwardings": { + "type": "array", + "title": "Forwardings", + "propertyOrder": 5, + "items": { + "type": "object", + "title": "Forwarding", + "additionalProperties": False, + "required": ["src", "dest"], + "properties": firewall_forwardings_properties, + }, + }, + "zones": { + "type": "array", + "title": "Zones", + "propertyOrder": 6, + "items": { + "type": "object", + "title": "Zones", + "additionalProperties": True, + "required": ["name"], + "properties": firewall_zones_properties, + }, + }, + "rules": { + "type": "array", + "title": "Rules", + "propertyOrder": 7, + "items": { + "type": "object", + "title": "Rules", + "additionalProperties": True, + "required": ["src", "target"], + "properties": firewall_rules_properties, + }, + }, + "redirects": { + "type": "array", + "title": "Redirects", + "propertyOrder": 8, + "items": { + "type": "object", + "title": "Redirect", + "additionalProperties": False, + "properties": firewall_redirect_properties, + }, + }, + "includes": { + "type": "array", + "title": "Includes", + "propertyOrder": 9, + "items": { + "type": "object", + "title": "Include", + "additionalProperties": False, + "required": ["path"], + "properties": firewall_includes_properties, + }, + }, +} + +firewall = { + "definitions": firewall_definitions, + "properties": { + "firewall": { + "type": "object", + "title": "Firewall", + "additionalProperties": True, + "propertyOrder": 11, + "properties": firewall_properties, + }, + }, +} + +schema = merge_config(schema, firewall) + # add OpenVPN schema schema = merge_config(schema, base_openvpn_schema) # OpenVPN customizations for OpenWRT diff --git a/tests/openwrt/test_default.py b/tests/openwrt/test_default.py index 82880b122..230e31b9a 100644 --- a/tests/openwrt/test_default.py +++ b/tests/openwrt/test_default.py @@ -23,56 +23,59 @@ def test_render_default(self): "boolean": True, } ], - "firewall": [ - { - "config_name": "rule", - "name": "Allow-MLD", - "src": "wan", - "proto": "icmp", - "src_ip": "fe80::/10", - "family": "ipv6", - "target": "ACCEPT", - "icmp_type": ["130/0", "131/0", "132/0", "143/0"], - }, - { - "config_name": "rule", - "name": "Rule2", - "src": "wan", - "proto": "icmp", - "src_ip": "192.168.1.1/24", - "family": "ipv4", - "target": "ACCEPT", - "icmp_type": ["130/0", "131/0", "132/0", "143/0"], - }, - ], + "firewall": { + "rules": [ + { + "name": "Allow-MLD", + "src": "wan", + "proto": ["icmp"], + "src_ip": "fe80::/10", + "family": "ipv6", + "target": "ACCEPT", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + }, + { + "name": "Rule2", + "src": "wan", + "proto": ["icmp"], + "src_ip": "192.168.1.1/24", + "family": "ipv4", + "target": "ACCEPT", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + }, + ] + }, } ) expected = self._tabs( - """package firewall + """\ +package firewall -config rule 'rule_1' - option family 'ipv6' - list icmp_type '130/0' - list icmp_type '131/0' - list icmp_type '132/0' - list icmp_type '143/0' +config defaults 'defaults' + +config rule 'Allow_MLD' option name 'Allow-MLD' - option proto 'icmp' option src 'wan' + option proto 'icmp' option src_ip 'fe80::/10' + option family 'ipv6' option target 'ACCEPT' - -config rule 'rule_2' - option family 'ipv4' list icmp_type '130/0' list icmp_type '131/0' list icmp_type '132/0' list icmp_type '143/0' + +config rule 'Rule2' option name 'Rule2' - option proto 'icmp' option src 'wan' + option proto 'icmp' option src_ip '192.168.1.1/24' + option family 'ipv4' option target 'ACCEPT' + list icmp_type '130/0' + list icmp_type '131/0' + list icmp_type '132/0' + list icmp_type '143/0' package luci @@ -135,53 +138,55 @@ def test_parse_default(self): ) o = OpenWrt(native=native) expected = { - "luci": [ - { - "config_name": "core", - "config_value": "main", - "lang": "auto", - "resourcebase": "/luci-static/resources", - "mediaurlbase": "/luci-static/bootstrap", - "number": "4", - "boolean": "1", - } - ], - "firewall": [ - { - "config_name": "rule", - "name": "Allow-MLD", - "src": "wan", - "proto": "icmp", - "src_ip": "fe80::/10", - "family": "ipv6", - "target": "ACCEPT", - "icmp_type": ["130/0", "131/0", "132/0", "143/0"], - } - ], "led": [ { + "dev": "1-1.1", + "interval": 50, "name": "USB1", "sysfs": "tp-link:green:usb1", "trigger": "usbdev", - "dev": "1-1.1", - "interval": 50, } ], "interfaces": [{"name": "eth0", "type": "ethernet"}], + "firewall": { + "rules": [ + { + "family": "ipv6", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + "name": "Allow-MLD", + "proto": ["icmp"], + "src": "wan", + "src_ip": "fe80::/10", + "target": "ACCEPT", + } + ] + }, + "luci": [ + { + "boolean": "1", + "lang": "auto", + "mediaurlbase": "/luci-static/bootstrap", + "number": "4", + "resourcebase": "/luci-static/resources", + "config_value": "main", + "config_name": "core", + } + ], "system": [ - {"test": "1", "config_name": "custom", "config_value": "custom"} + {"test": "1", "config_value": "custom", "config_name": "custom"} ], } + self.assertDictEqual(o.config, expected) def test_skip(self): o = OpenWrt({"skipme": {"enabled": True}}) - self.assertEqual(o.render(), '') + self.assertEqual(o.render(), "") @capture_stdout() def test_warning(self): o = OpenWrt({"luci": [{"unrecognized": True}]}) - self.assertEqual(o.render(), '') + self.assertEqual(o.render(), "") def test_merge(self): template = { @@ -220,8 +225,8 @@ def test_merge(self): self.assertEqual(o.config, expected) def test_skip_nonlists(self): - o = OpenWrt({"custom_package": {'unknown': True}}) - self.assertEqual(o.render(), '') + o = OpenWrt({"custom_package": {"unknown": True}}) + self.assertEqual(o.render(), "") def test_render_invalid_uci_name(self): o = OpenWrt( diff --git a/tests/openwrt/test_firewall.py b/tests/openwrt/test_firewall.py new file mode 100644 index 000000000..33b74d244 --- /dev/null +++ b/tests/openwrt/test_firewall.py @@ -0,0 +1,940 @@ +import textwrap +import unittest + +from netjsonconfig import OpenWrt +from netjsonconfig.exceptions import ValidationError +from netjsonconfig.utils import _TabsMixin + + +class TestFirewall(unittest.TestCase, _TabsMixin): + maxDiff = None + + _defaults_1_netjson = { + "firewall": { + "defaults": { + "input": "ACCEPT", + "forward": "REJECT", + "output": "ACCEPT", + "synflood_protect": True, + } + } + } + + _defaults_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + option input 'ACCEPT' + option forward 'REJECT' + option output 'ACCEPT' + option synflood_protect '1' + """ + ) + + def test_render_defaults_1(self): + o = OpenWrt(self._defaults_1_netjson) + expected = self._tabs(self._defaults_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_defaults_1(self): + o = OpenWrt(native=self._defaults_1_uci) + self.assertEqual(o.config, self._defaults_1_netjson) + + _defaults_2_netjson = { + "firewall": { + "defaults": { + "input": "ACCEPT", + "output": "ACCEPT", + "forward": "REJECT", + "custom_chains": True, + "drop_invalid": True, + "synflood_protect": True, + "synflood_burst": 50, + "tcp_ecn": True, + "tcp_syncookies": True, + "tcp_window_scaling": True, + "disable_ipv6": False, + "flow_offloading": False, + "flow_offloading_hw": False, + "auto_helper": True, + } + } + } + + _defaults_2_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + option input 'ACCEPT' + option output 'ACCEPT' + option forward 'REJECT' + option custom_chains '1' + option drop_invalid '1' + option synflood_protect '1' + option synflood_burst '50' + option tcp_ecn '1' + option tcp_syncookies '1' + option tcp_window_scaling '1' + option disable_ipv6 '0' + option flow_offloading '0' + option flow_offloading_hw '0' + option auto_helper '1' + """ + ) + + def test_render_defaults_2(self): + o = OpenWrt(self._defaults_2_netjson) + expected = self._tabs(self._defaults_2_uci) + self.assertEqual(o.render(), expected) + + def test_parse_defaults_2(self): + o = OpenWrt(native=self._defaults_2_uci) + self.assertEqual(o.config, self._defaults_2_netjson) + + _rule_1_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-MLD", + "src": "wan", + "src_ip": "fe80::/10", + "proto": ["icmp"], + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + "target": "ACCEPT", + "family": "ipv6", + } + ] + } + } + + _rule_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'Allow_MLD' + option name 'Allow-MLD' + option src 'wan' + option src_ip 'fe80::/10' + option proto 'icmp' + list icmp_type '130/0' + list icmp_type '131/0' + list icmp_type '132/0' + list icmp_type '143/0' + option target 'ACCEPT' + option family 'ipv6' + """ + ) + + def test_render_rule_1(self): + o = OpenWrt(self._rule_1_netjson) + expected = self._tabs(self._rule_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_1(self): + o = OpenWrt(native=self._rule_1_uci) + self.assertEqual(o.config, self._rule_1_netjson) + + _rule_2_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-DHCPv6", + "src": "wan", + "src_ip": "fc00::/6", + "dest_ip": "fc00::/6", + "dest_port": "546", + "proto": ["udp"], + "target": "ACCEPT", + "family": "ipv6", + } + ] + } + } + + _rule_2_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'Allow_DHCPv6' + option name 'Allow-DHCPv6' + option src 'wan' + option src_ip 'fc00::/6' + option dest_ip 'fc00::/6' + option dest_port '546' + option proto 'udp' + option target 'ACCEPT' + option family 'ipv6' + """ + ) + + def test_render_rule_2(self): + o = OpenWrt(self._rule_2_netjson) + expected = self._tabs(self._rule_2_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_2(self): + o = OpenWrt(native=self._rule_2_uci) + self.assertEqual(o.config, self._rule_2_netjson) + + _rule_3_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-Ping", + "src": "wan", + "proto": ["icmp"], + "family": "ipv4", + "icmp_type": ["echo-request"], + "target": "ACCEPT", + "enabled": False, + } + ] + } + } + + _rule_3_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'Allow_Ping' + option name 'Allow-Ping' + option src 'wan' + option proto 'icmp' + option family 'ipv4' + list icmp_type 'echo-request' + option target 'ACCEPT' + option enabled '0' + """ + ) + + def test_render_rule_3(self): + o = OpenWrt(self._rule_3_netjson) + expected = self._tabs(self._rule_3_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_3(self): + o = OpenWrt(native=self._rule_3_uci) + self.assertEqual(o.config, self._rule_3_netjson) + + _rule_4_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-Isolated-DHCP", + "src": "isolated", + "proto": ["udp"], + "dest_port": "67-68", + "target": "ACCEPT", + } + ] + } + } + + _rule_4_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'Allow_Isolated_DHCP' + option name 'Allow-Isolated-DHCP' + option src 'isolated' + option proto 'udp' + option dest_port '67-68' + option target 'ACCEPT' + """ + ) + + def test_render_rule_4(self): + o = OpenWrt(self._rule_4_netjson) + expected = self._tabs(self._rule_4_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_4(self): + o = OpenWrt(native=self._rule_4_uci) + self.assertEqual(o.config, self._rule_4_netjson) + + _rule_5_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-Isolated-DHCP", + "src_ip": "10.10.10.10", + "src_mac": "fc:aa:14:18:12:98", + "src": "isolated", + "proto": ["udp"], + "dest_port": "67-68", + "target": "ACCEPT", + } + ] + } + } + + _rule_5_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'Allow_Isolated_DHCP' + option name 'Allow-Isolated-DHCP' + option src_ip '10.10.10.10' + option src_mac 'fc:aa:14:18:12:98' + option src 'isolated' + option proto 'udp' + option dest_port '67-68' + option target 'ACCEPT' + """ + ) + + def test_render_rule_5(self): + o = OpenWrt(self._rule_5_netjson) + expected = self._tabs(self._rule_5_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_5(self): + o = OpenWrt(native=self._rule_5_uci) + self.assertEqual(o.config, self._rule_5_netjson) + + _rule_6_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-Isolated-DHCP", + "src_ip": "10.10.10.10", + "src_mac": "fc:aa:14:18:12:98", + "src": "isolated", + "proto": ["udp"], + "dest_port": "67-68", + "target": "ACCEPT", + "dest": "dest_zone", + "dest_ip": "192.168.1.2", + "ipset": "my_ipset", + "mark": "DROP", + "start_date": "2021-01-21", + "stop_date": "2021-01-22", + "start_time": "01:01:01", + "stop_time": "11:11:11", + "weekdays": ["sun", "mon"], + "monthdays": [2, 10], + "utc_time": True, + "family": "any", + "limit": "3/second", + "limit_burst": 30, + "enabled": True, + } + ] + } + } + + _rule_6_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'Allow_Isolated_DHCP' + option name 'Allow-Isolated-DHCP' + option src_ip '10.10.10.10' + option src_mac 'fc:aa:14:18:12:98' + option src 'isolated' + option proto 'udp' + option dest_port '67-68' + option target 'ACCEPT' + option dest 'dest_zone' + option dest_ip '192.168.1.2' + option ipset 'my_ipset' + option mark 'DROP' + option start_date '2021-01-21' + option stop_date '2021-01-22' + option start_time '01:01:01' + option stop_time '11:11:11' + list weekdays 'sun' + list weekdays 'mon' + list monthdays '2' + list monthdays '10' + option utc_time '1' + option family 'any' + option limit '3/second' + option limit_burst '30' + option enabled '1' + """ + ) + + def test_render_rule_6(self): + o = OpenWrt(self._rule_6_netjson) + expected = self._tabs(self._rule_6_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_6(self): + o = OpenWrt(native=self._rule_6_uci) + self.assertEqual(o.config, self._rule_6_netjson) + + _zone_1_netjson = { + "firewall": { + "zones": [ + { + "name": "lan", + "input": "ACCEPT", + "output": "ACCEPT", + "forward": "ACCEPT", + "network": ["lan"], + "mtu_fix": True, + } + ] + } + } + + _zone_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config zone 'lan' + option name 'lan' + option input 'ACCEPT' + option output 'ACCEPT' + option forward 'ACCEPT' + option network 'lan' + option mtu_fix '1' + """ + ) + + def test_render_zone_1(self): + o = OpenWrt(self._zone_1_netjson) + expected = self._tabs(self._zone_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_zone_1(self): + o = OpenWrt(native=self._zone_1_uci) + self.assertEqual(o.config, self._zone_1_netjson) + + _zone_2_netjson = { + "firewall": { + "zones": [ + { + "name": "wan", + "input": "DROP", + "output": "ACCEPT", + "forward": "DROP", + "network": ["wan", "wan6"], + "mtu_fix": True, + "masq": True, + } + ] + } + } + + _zone_2_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config zone 'wan' + option name 'wan' + option input 'DROP' + option output 'ACCEPT' + option forward 'DROP' + list network 'wan' + list network 'wan6' + option mtu_fix '1' + option masq '1' + """ + ) + + # This one is the same as _zone_2_uci with the exception that the "network" + # parameter is specified as a single string. + _zone_3_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config zone 'wan' + option name 'wan' + option input 'DROP' + option output 'ACCEPT' + option forward 'DROP' + option network 'wan wan6' + option mtu_fix '1' + option masq '1' + """ + ) + + def test_render_zone_2(self): + o = OpenWrt(self._zone_2_netjson) + expected = self._tabs(self._zone_2_uci) + self.assertEqual(o.render(), expected) + + def test_parse_zone_2(self): + o = OpenWrt(native=self._zone_2_uci) + self.assertEqual(o.config, self._zone_2_netjson) + + def test_parse_zone_3(self): + o = OpenWrt(native=self._zone_3_uci) + self.assertEqual(o.config, self._zone_2_netjson) + + _forwarding_1_netjson = { + "firewall": { + "forwardings": [{"name": "isolated-wan", "src": "isolated", "dest": "wan"}] + } + } + + _forwarding_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config forwarding 'isolated_wan' + option name 'isolated-wan' + option src 'isolated' + option dest 'wan' + """ + ) + + def test_render_forwarding_1(self): + o = OpenWrt(self._forwarding_1_netjson) + expected = self._tabs(self._forwarding_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_forwarding_1(self): + o = OpenWrt(native=self._forwarding_1_uci) + self.assertEqual(o.config, self._forwarding_1_netjson) + + _forwarding_2_netjson = { + "firewall": { + "forwardings": [ + { + "name": "isolated-wan-ipv4", + "src": "isolated", + "dest": "wan", + "family": "ipv4", + } + ] + } + } + + _forwarding_2_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config forwarding 'isolated_wan_ipv4' + option name 'isolated-wan-ipv4' + option src 'isolated' + option dest 'wan' + option family 'ipv4' + """ + ) + + def test_render_forwarding_2(self): + o = OpenWrt(self._forwarding_2_netjson) + expected = self._tabs(self._forwarding_2_uci) + self.assertEqual(o.render(), expected) + + def test_parse_forwarding_2(self): + o = OpenWrt(native=self._forwarding_2_uci) + self.assertEqual(o.config, self._forwarding_2_netjson) + + _forwarding_3_netjson = { + "firewall": { + "forwardings": [ + { + "name": "lan-wan-any", + "src": "lan", + "dest": "wan", + "family": "any", + "enabled": False, + } + ] + } + } + + _forwarding_3_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config forwarding 'lan_wan_any' + option name 'lan-wan-any' + option src 'lan' + option dest 'wan' + option family 'any' + option enabled '0' + """ + ) + + def test_render_forwarding_3(self): + o = OpenWrt(self._forwarding_3_netjson) + expected = self._tabs(self._forwarding_3_uci) + self.assertEqual(o.render(), expected) + + def test_parse_forwarding_3(self): + o = OpenWrt(native=self._forwarding_3_uci) + self.assertEqual(o.config, self._forwarding_3_netjson) + + _forwarding_4_netjson = { + "firewall": { + "forwardings": [ + { + "name": "forward_name", + "src": "lan", + "dest": "wan", + "family": "any", + "enabled": False, + } + ] + } + } + + _forwarding_4_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config forwarding 'forward_name' + option name 'forward_name' + option src 'lan' + option dest 'wan' + option family 'any' + option enabled '0' + """ + ) + + def test_render_forwarding_4(self): + o = OpenWrt(self._forwarding_4_netjson) + expected = self._tabs(self._forwarding_4_uci) + self.assertEqual(o.render(), expected) + + def test_parse_forwarding_4(self): + o = OpenWrt(native=self._forwarding_4_uci) + print(o.config) + self.assertEqual(o.config, self._forwarding_4_netjson) + + def test_forwarding_validation_error(self): + o = OpenWrt( + { + "firewall": { + "forwardings": [{"src": "lan", "dest": "wan", "family": "XXXXXX"}] + } + } + ) + with self.assertRaises(ValidationError): + o.validate() + + _redirect_1_netjson = { + "firewall": { + "redirects": [ + { + "name": "Adblock DNS, port 53", + "src": "lan", + "proto": ["tcp", "udp"], + "src_dport": "53", + "dest_port": "53", + "target": "DNAT", + } + ] + } + } + + _redirect_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config redirect 'Adblock DNS, port 53' + option name 'Adblock DNS, port 53' + option src 'lan' + option proto 'tcpudp' + option src_dport '53' + option dest_port '53' + option target 'DNAT' + """ + ) + + def test_render_redirect_1(self): + o = OpenWrt(self._redirect_1_netjson) + expected = self._tabs(self._redirect_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_redirect_1(self): + o = OpenWrt(native=self._redirect_1_uci) + self.assertEqual(o.config, self._redirect_1_netjson) + + _redirect_2_netjson = { + "firewall": { + "redirects": [ + { + "name": "Adblock DNS, port 53", + "src": "lan", + "proto": ["tcp", "udp"], + "src_dport": "53", + "dest_port": "53", + "target": "DNAT", + # Contrived, unrealistic example for testing + "weekdays": ["mon", "tue", "wed"], + "monthdays": [1, 2, 3, 29, 30], + } + ] + } + } + + _redirect_2_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config redirect 'Adblock DNS, port 53' + option name 'Adblock DNS, port 53' + option src 'lan' + option proto 'tcpudp' + option src_dport '53' + option dest_port '53' + option target 'DNAT' + list weekdays 'mon' + list weekdays 'tue' + list weekdays 'wed' + list monthdays '1' + list monthdays '2' + list monthdays '3' + list monthdays '29' + list monthdays '30' + """ + ) + + def test_render_redirect_2(self): + o = OpenWrt(self._redirect_2_netjson) + expected = self._tabs(self._redirect_2_uci) + self.assertEqual(o.render(), expected) + + def test_parse_redirect_2(self): + o = OpenWrt(native=self._redirect_2_uci) + self.assertEqual(o.config, self._redirect_2_netjson) + + def test_redirect_weekdays_validation_error_1(self): + o = OpenWrt({"firewall": {"redirects": [{"weekdays": ["mon", "xxx"]}]}}) + with self.assertRaises(ValidationError): + o.validate() + + def test_redirect_weekdays_validation_error_2(self): + o = OpenWrt({"firewall": {"redirects": [{"weekdays": ["mon", 1]}]}}) + with self.assertRaises(ValidationError): + o.validate() + + def test_redirect_monthdays_validation_error_1(self): + o = OpenWrt({"firewall": {"redirects": [{"monthdays": [2, 8, 32]}]}}) + with self.assertRaises(ValidationError): + o.validate() + + def test_redirect_monthdays_validation_error_2(self): + o = OpenWrt({"firewall": {"redirects": [{"monthdays": [0, 2, 8]}]}}) + with self.assertRaises(ValidationError): + o.validate() + + _redirect_3_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config redirect 'Adblock DNS, port 53' + option name 'Adblock DNS, port 53' + option src 'lan' + option proto 'tcpudp' + option src_dport '53' + option dest_port '53' + option target 'DNAT' + option weekdays '! mon tue wed' + option monthdays '! 1 2 3 4 5' + """ + ) + + _redirect_3_netjson = { + "firewall": { + "redirects": [ + { + "name": "Adblock DNS, port 53", + "src": "lan", + "proto": ["tcp", "udp"], + "src_dport": "53", + "dest_port": "53", + "target": "DNAT", + "weekdays": ["sun", "thu", "fri", "sat"], + "monthdays": [ + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + ], + } + ] + } + } + + def test_parse_redirect_3(self): + o = OpenWrt(native=self._redirect_3_uci) + self.assertEqual(o.config, self._redirect_3_netjson) + + _redirect_4_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config redirect 'Adblock DNS, port 53' + option name 'Adblock DNS, port 53' + option src 'lan' + option proto 'tcpudp' + option src_dport '53' + option dest_port '53' + option target 'DNAT' + list weekdays 'mon' + list weekdays 'tue' + list weekdays 'wed' + list monthdays '1' + list monthdays '2' + list monthdays '31' + option src_ip '192.168.1.1' + option src_dip '192.168.1.1' + option src_mac 'AA:AA:AA:AA:AA:AA' + option src_port '1-1064' + option dest 'wan' + option dest_ip '10.0.0.1' + option ipset 'myipset' + option mark '0xff' + option start_date '2020-02-02' + option stop_date '2020-03-02' + option start_time '12:12:12' + option stop_time '23:23:23' + option utc_time '1' + option family 'any' + option reflection '0' + option reflection_src 'external' + option limit '3/sec' + option limit_burst '5' + option enabled '0' + """ + ) + + _redirect_4_netjson = { + "firewall": { + "redirects": [ + { + "name": "Adblock DNS, port 53", + "src": "lan", + "proto": ["tcp", "udp"], + "src_dport": "53", + "dest_port": "53", + "target": "DNAT", + "weekdays": ["mon", "tue", "wed"], + "monthdays": [1, 2, 31], + "src_ip": "192.168.1.1", + "src_dip": "192.168.1.1", + "src_mac": "AA:AA:AA:AA:AA:AA", + "src_port": "1-1064", + "dest": "wan", + "dest_ip": "10.0.0.1", + "ipset": "myipset", + "mark": "0xff", + "start_date": "2020-02-02", + "stop_date": "2020-03-02", + "start_time": "12:12:12", + "stop_time": "23:23:23", + "utc_time": True, + "family": "any", + "reflection": False, + "reflection_src": "external", + "limit": "3/sec", + "limit_burst": 5, + "enabled": False, + } + ] + } + } + + def test_render_redirect_4(self): + o = OpenWrt(self._redirect_4_netjson) + expected = self._tabs(self._redirect_4_uci) + self.assertEqual(o.render(), expected) + + def test_parse_redirect_4(self): + o = OpenWrt(native=self._redirect_4_uci) + self.assertEqual(o.config, self._redirect_4_netjson) + + _include_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config include 'Include Test' + option name 'Include Test' + option type 'script' + option family 'any' + option path '/a/b/c.ipt' + option reload '1' + option enabled '0' + """ + ) + + _include_1_netjson = { + "firewall": { + "includes": [ + { + "name": "Include Test", + "type": "script", + "family": "any", + "path": "/a/b/c.ipt", + "reload": True, + "enabled": False, + } + ] + } + } + + def test_render_include_1(self): + o = OpenWrt(self._include_1_netjson) + expected = self._tabs(self._include_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_include_1(self): + o = OpenWrt(native=self._include_1_uci) + self.assertEqual(o.config, self._include_1_netjson)