diff --git a/base_dav/README.rst b/base_dav/README.rst new file mode 100644 index 000000000..91e495622 --- /dev/null +++ b/base_dav/README.rst @@ -0,0 +1,128 @@ +========================== +Caldav and Carddav support +========================== + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:6cc5b91f1cff865b4527a2097534adb50c8bade6eaed9f3b820275b7b8ab19d3 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/licence-AGPL--3-blue.png + :target: http://www.gnu.org/licenses/agpl-3.0-standalone.html + :alt: License: AGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fserver--backend-lightgray.png?logo=github + :target: https://github.com/OCA/server-backend/tree/18.0/base_dav + :alt: OCA/server-backend +.. |badge4| image:: https://img.shields.io/badge/weblate-Translate%20me-F47D42.png + :target: https://translation.odoo-community.org/projects/server-backend-18-0/server-backend-18-0-base_dav + :alt: Translate me on Weblate +.. |badge5| image:: https://img.shields.io/badge/runboat-Try%20me-875A7B.png + :target: https://runboat.odoo-community.org/builds?repo=OCA/server-backend&target_branch=18.0 + :alt: Try me on Runboat + +|badge1| |badge2| |badge3| |badge4| |badge5| + +This module adds WebDAV support to Odoo, specifically CalDAV and +CardDAV. + +You can configure arbitrary objects as a calendar or an address book, +thus make arbitrary information accessible in external systems or your +mobile. + +**Table of contents** + +.. contents:: + :local: + +Configuration +============= + +To configure this module, you need to: + +1. Go to Settings / WebDAV Collections and create or edit your + collections. There, you'll also see the URL to point your clients to. + +Note that you need to configure a dbfilter if you use multiple +databases. + +Known issues / Roadmap +====================== + +- Much better UX for configuring collections (probably provide a group + that sees the current fully flexible field mappings, and by default + show some dumbed down version where you can select some preselected + vobject fields); +- Support todo lists and journals; +- Support configuring default field mappings per model; +- Support plain WebDAV collections to make some model's records + accessible as folders, and the records' attachments as files (r/w); +- Support configuring lists of calendars so that you can have a calendar + for every project and appointments are tasks, or a calendar for every + sales team and appointments are sale orders. Lots of possibilities. + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* initOS GmbH +* Therp BV + +Contributors +------------ + +- Holger Brunn +- Florian Kantelberg +- `Cetmix `__ + + - Ivan Sokolov + - George Smirnov + - Dmitry Meita + +Other credits +------------- + +- Odoo Community Association +- All the actual work is done by `Radicale `__ + +Maintainers +----------- + +This module is maintained by the OCA. + +.. image:: https://odoo-community.org/logo.png + :alt: Odoo Community Association + :target: https://odoo-community.org + +OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use. + +.. |maintainer-hbrunn| image:: https://github.com/hbrunn.png?size=40px + :target: https://github.com/hbrunn + :alt: hbrunn + +Current `maintainer `__: + +|maintainer-hbrunn| + +This module is part of the `OCA/server-backend `_ project on GitHub. + +You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute. diff --git a/base_dav/__init__.py b/base_dav/__init__.py new file mode 100644 index 000000000..ca1e0a6ef --- /dev/null +++ b/base_dav/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2018 Therp BV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). +import logging + +from . import models +from . import controllers +from . import radicale + + +def _restore_odoo_runbot_level_name(): + """Restore odoo RUNBOT level display name after Radicale imports.""" + runbot_level = getattr(logging, "RUNBOT", 25) + if logging.getLevelName(runbot_level) != "INFO": + logging.addLevelName(runbot_level, "INFO") + + +_restore_odoo_runbot_level_name() diff --git a/base_dav/__manifest__.py b/base_dav/__manifest__.py new file mode 100644 index 000000000..a175b56c3 --- /dev/null +++ b/base_dav/__manifest__.py @@ -0,0 +1,26 @@ +# Copyright 2018 Therp BV +# Copyright 2019-2020 initOS GmbH +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). +{ + "name": "Caldav and Carddav support", + "version": "18.0.1.0.0", + "author": "initOS GmbH,Therp BV,Odoo Community Association (OCA)", + "license": "AGPL-3", + "category": "Extra Tools", + "summary": "Access Odoo data as calendar or address book", + "website": "https://github.com/OCA/server-backend", + "maintainers": ["hbrunn"], + "depends": [ + "base", + ], + "demo": [ + "demo/dav_collection.xml", + ], + "data": [ + "views/dav_collection.xml", + "security/ir.model.access.csv", + ], + "external_dependencies": { + "python": ["radicale>3.3.0"], + }, +} diff --git a/base_dav/controllers/__init__.py b/base_dav/controllers/__init__.py new file mode 100644 index 000000000..665f08cea --- /dev/null +++ b/base_dav/controllers/__init__.py @@ -0,0 +1,3 @@ +# Copyright 2018 Therp BV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). +from . import main diff --git a/base_dav/controllers/main.py b/base_dav/controllers/main.py new file mode 100644 index 000000000..9441e4a58 --- /dev/null +++ b/base_dav/controllers/main.py @@ -0,0 +1,102 @@ +# Copyright 2018 Therp BV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +import io +import sys +from functools import lru_cache + +import werkzeug +from radicale import config as radicale_config +from radicale.app import Application +from werkzeug.wrappers.response import Response as WerkzeugResponse + +from odoo import http +from odoo.http import request + +PREFIX = "/.dav" + + +@lru_cache(maxsize=1) +def _get_radicale_app(): + """Build and cache a Radicale app per worker process.""" + configuration = radicale_config.load() + configuration.update( + { + "auth": {"type": "odoo.addons.base_dav.radicale.auth"}, + "storage": {"type": "odoo.addons.base_dav.radicale.collection"}, + "rights": {"type": "odoo.addons.base_dav.radicale.rights"}, + "web": {"type": "none"}, + }, + "odoo", + ) + return Application(configuration) + + +class Main(http.Controller): + @http.route( + ["/.well-known/carddav", "/.well-known/caldav", "/.well-known/webdav"], + type="http", + auth="none", + csrf=False, + ) + def handle_well_known_request(self) -> WerkzeugResponse: + """ + Redirect well-known CalDAV/CardDAV/WebDAV endpoints to the Radicale mount point. + + :return: HTTP 301 redirect response to ``/.dav``. + :rtype: werkzeug.wrappers.response.Response + """ + return werkzeug.utils.redirect(PREFIX, 301) + + @http.route( + [PREFIX, f"{PREFIX}/"], + type="http", + auth="none", + csrf=False, + ) + def handle_dav_request(self, davpath=None, **kwargs): + """Proxy WebDAV/CalDAV/CardDAV requests to the Radicale app. + + :param davpath: Path relative to the DAV mount point. + :type davpath: str | None + :param kwargs: Extra route keyword arguments, unused. + :type kwargs: dict + + :return: Response produced by Radicale. + :rtype: odoo.http.Response + """ + del kwargs + app = _get_radicale_app() + + environ = dict(request.httprequest.environ) + environ.setdefault("wsgi.errors", sys.stderr) + + raw_body = request.httprequest.get_data(cache=False) or b"" + environ["wsgi.input"] = io.BytesIO(raw_body) + + environ["SCRIPT_NAME"] = PREFIX + environ["HTTP_X_SCRIPT_NAME"] = PREFIX + environ["PATH_INFO"] = "/" + (davpath or "") + + status_headers = {"status": "500 Internal Server Error", "headers": []} + + def start_response(status, headers, exc_info=None): + """WSGI start_response callback used by Radicale. + + :param status: HTTP status line. + :type status: str + :param headers: Sequence of ``(header_name, header_value)``. + :type headers: Sequence[tuple[str, str]] + :param exc_info: Optional exception info as per WSGI spec (unused). + :type exc_info: Any, optional + """ + status_headers["status"] = status + status_headers["headers"] = headers + + result_iter = app(environ, start_response) + + return http.Response( + response=result_iter, + status=status_headers["status"], + headers=status_headers["headers"], + ) diff --git a/base_dav/demo/dav_collection.xml b/base_dav/demo/dav_collection.xml new file mode 100644 index 000000000..ddb6de922 --- /dev/null +++ b/base_dav/demo/dav_collection.xml @@ -0,0 +1,39 @@ + + + Addressbook + addressbook + + [] + authenticated + + + + N + + + + + + FN + + + + + + photo + + + + + + email + + + + + + tel + + + + diff --git a/base_dav/i18n/base_dav.pot b/base_dav/i18n/base_dav.pot new file mode 100644 index 000000000..e1b012f25 --- /dev/null +++ b/base_dav/i18n/base_dav.pot @@ -0,0 +1,214 @@ +# Translation of Odoo Server. +# This file contains the translation of the following modules: +# * base_dav +# +msgid "" +msgstr "" +"Project-Id-Version: Odoo Server 18.0\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2026-02-23 19:13+0000\n" +"PO-Revision-Date: 2026-02-23 19:13+0000\n" +"Last-Translator: \n" +"Language-Team: \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=UTF-8\n" +"Content-Transfer-Encoding: \n" +"Plural-Forms: \n" + +#. module: base_dav +#: model:ir.model,name:base_dav.model_dav_collection +msgid "A collection accessible via WebDAV" +msgstr "" + +#. module: base_dav +#: model:ir.model,name:base_dav.model_dav_collection_field_mapping +msgid "A field mapping for a WebDAV collection" +msgstr "" + +#. module: base_dav +#: model_terms:ir.ui.view,arch_db:base_dav.view_dav_collection_form +msgid "Access" +msgstr "" + +#. module: base_dav +#: model_terms:ir.ui.view,arch_db:base_dav.view_dav_collection_form +msgid "Additional field mapping" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection__dav_type__addressbook +msgid "Addressbook" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,help:base_dav.field_dav_collection_field_mapping__name +msgid "Attribute name in the vobject" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection__rights__authenticated +msgid "Authenticated" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection__dav_type__calendar +msgid "Calendar" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection_field_mapping__mapping_type__code +msgid "Code" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,help:base_dav.field_dav_collection_field_mapping__export_code +msgid "" +"Code to export the value to a vobject. Use the variable result for the " +"output of the value and record as input" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,help:base_dav.field_dav_collection_field_mapping__import_code +msgid "" +"Code to import the value from a vobject. Use the variable result for the " +"output of the value and item as input" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__collection_id +msgid "Collection" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__create_uid +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__create_uid +msgid "Created by" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__create_date +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__create_date +msgid "Created on" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__display_name +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__display_name +msgid "Display Name" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__domain +msgid "Domain" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__export_code +msgid "Export Code" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__field_id +msgid "Field" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__field_uuid +msgid "Field Uuid" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__field_mapping_ids +msgid "Field mappings" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,help:base_dav.field_dav_collection_field_mapping__field_id +msgid "Field of the model the values are mapped to" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection__dav_type__files +msgid "Files" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__id +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__id +msgid "ID" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__import_code +msgid "Import Code" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__write_uid +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__write_uid +msgid "Last Updated by" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__write_date +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__write_date +msgid "Last Updated on" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__mapping_type +msgid "Mapping Type" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__model_id +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__model_id +msgid "Model" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__name +#: model:ir.model.fields,field_description:base_dav.field_dav_collection_field_mapping__name +msgid "Name" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection__rights__owner_only +msgid "Owner Only" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection__rights__owner_write_only +msgid "Owner Write Only" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__rights +msgid "Rights" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields.selection,name:base_dav.selection__dav_collection_field_mapping__mapping_type__simple +msgid "Simple" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__tag +msgid "Tag" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__dav_type +msgid "Type" +msgstr "" + +#. module: base_dav +#: model:ir.model.fields,field_description:base_dav.field_dav_collection__url +msgid "Url" +msgstr "" + +#. module: base_dav +#: model:ir.actions.act_window,name:base_dav.action_dav_collection +#: model:ir.ui.menu,name:base_dav.menu_dav_collection +msgid "WebDAV collections" +msgstr "" diff --git a/base_dav/models/__init__.py b/base_dav/models/__init__.py new file mode 100644 index 000000000..3e6c8778b --- /dev/null +++ b/base_dav/models/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2018 Therp BV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). +from . import dav_collection +from . import dav_collection_field_mapping diff --git a/base_dav/models/dav_collection.py b/base_dav/models/dav_collection.py new file mode 100644 index 000000000..447e8c11b --- /dev/null +++ b/base_dav/models/dav_collection.py @@ -0,0 +1,550 @@ +# Copyright 2019 Therp BV +# Copyright 2019-2020 initOS GmbH +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +import posixpath +import time +from operator import itemgetter +from urllib.parse import quote_plus, unquote_plus + +import vobject + +from odoo import api, fields, models +from odoo.exceptions import AccessError +from odoo.osv import expression +from odoo.tools.safe_eval import safe_eval + +from ..controllers.main import PREFIX + +_DAV_ITEM_EXTENSIONS = (".vcf", ".ics") + + +def _dav_strip_item_extension(name: str) -> str: + """Return DAV item identifier without common extensions (.vcf/.ics).""" + name = (name or "").strip() + lower = name.lower() + for ext in _DAV_ITEM_EXTENSIONS: + if lower.endswith(ext): + return name[: -len(ext)] + return name + + +class DavCollection(models.Model): + _name = "dav.collection" + _description = "A collection accessible via WebDAV" + + name = fields.Char(required=True) + rights = fields.Selection( + [ + ("owner_only", "Owner Only"), + ("owner_write_only", "Owner Write Only"), + ("authenticated", "Authenticated"), + ], + required=True, + default="owner_only", + ) + dav_type = fields.Selection( + [ + ("calendar", "Calendar"), + ("addressbook", "Addressbook"), + ("files", "Files"), + ], + string="Type", + required=True, + default="calendar", + ) + tag = fields.Char(compute="_compute_tag") + model_name = fields.Char( + string="Model Technical Name", + related="model_id.model", + ) + model_id = fields.Many2one( + "ir.model", + required=True, + domain=[("transient", "=", False)], + ondelete="cascade", + ) + domain = fields.Char( + required=True, + default="[]", + ) + field_uuid = fields.Many2one("ir.model.fields") + field_mapping_ids = fields.One2many( + "dav.collection.field_mapping", + "collection_id", + string="Field mappings", + ) + url = fields.Char(compute="_compute_url") + calendar_color = fields.Char(default="#48c9f4") + + @api.depends("dav_type") + def _compute_tag(self): + """Compute DAV collection tag based on its type. + + Sets ``tag`` field to a DAV-specific container name: + - ``VCALENDAR`` for calendar + - ``VADDRESSBOOK`` for addressbook + - False for files + """ + for rec in self: + if rec.dav_type == "calendar": + rec.tag = "VCALENDAR" + elif rec.dav_type == "addressbook": + rec.tag = "VADDRESSBOOK" + else: + rec.tag = False + + def _compute_url(self): + """Compute absolute DAV access URL for the collection. + + URL is constructed using: + - system base URL + - DAV prefix + - current user login + - collection ID + """ + base_url = ( + self.env["ir.config_parameter"].sudo().get_param("web.base.url") or "" + ).rstrip("/") + login = self.env.user.login + for rec in self: + rec.url = f"{base_url}{PREFIX}/{login}/{rec.id}" + + @api.constrains("domain") + def _check_domain(self): + """Validate domain expression. + + Ensures that the stored domain string can be safely evaluated. + + :raises Exception: If domain evaluation fails + """ + for rec in self: + rec._eval_domain() + + @api.model + def _eval_context(self): + """Return safe evaluation context for domain expressions. + + :return: Dictionary containing available evaluation variables + :rtype: Dict[str, Any] + """ + return { + "user": self.env.user, + } + + def _eval_domain(self): + """Evaluate stored domain expression into Odoo domain list. + + :raises ValueError: If domain string is invalid + :return: Evaluated domain + :rtype: List[Any] + """ + self.ensure_one() + return safe_eval(self.domain or "[]", self._eval_context()) + + def eval_domain_records(self): + """Search records matching the evaluated domain. + + :return: Recordset of matching records + :rtype: odoo.models.BaseModel + """ + self.ensure_one() + model_name = self.sudo().model_id.model + return self.env[model_name].search(self._eval_domain()) + + def get_record(self, components): + """Retrieve record from path components. + + :param components: Parsed DAV path components + :type components: Sequence[str] + + :return: Matching record or empty recordset + :rtype: odoo.models.BaseModel + """ + self.ensure_one() + model_name = self.sudo().model_id.model + collection_model = self.env[model_name] + raw_key = components[-1] if components else "" + key = _dav_strip_item_extension(raw_key) + field_uuid = self.sudo().field_uuid + if field_uuid: + field_name = field_uuid.name + if field_uuid.ttype in ("integer", "many2one"): + try: + key = int(key) + except (TypeError, ValueError): + return collection_model.browse() + else: + field_name = "id" + try: + key = int(key) + except (TypeError, ValueError): + return collection_model.browse() + + domain = expression.AND( + [ + [(field_name, "=", key)], + self._eval_domain(), + ] + ) + return collection_model.search(domain, limit=1) + + def _get_record_uid_value(self, record): + """Return the DAV item identifier for a record. + + Uses ``field_uuid`` when configured, otherwise falls back to ``record.id``. + The returned value matches the identifier format expected by + :meth:`get_record`. + """ + self.ensure_one() + + field_uuid = self.sudo().field_uuid + if not field_uuid: + return str(record.id) + + value = record[field_uuid.name] + if field_uuid.ttype == "many2one": + return str(value.id) if value else "" + return str(value) + + def from_vobject(self, item): + """Convert vobject item into Odoo field values. + + Supports: + - VEVENT for calendar + - VCARD for addressbook + + :param item: vobject instance + :type item: Any + + :return: Dictionary of field values or None if unsupported + :rtype: Optional[Dict[str, Any]] + """ + self.ensure_one() + + if self.dav_type == "calendar": + if item.name != "VCALENDAR" or not hasattr(item, "vevent"): + return None + item = item.vevent + elif self.dav_type == "addressbook": + if item.name != "VCARD": + return None + else: + return None + + result = {} + children = {c.name.lower(): c for c in item.getChildren()} + for mapping in self.field_mapping_ids: + field_id = mapping.sudo().field_id + child = children.get(mapping.name.lower()) + if not child: + continue + value = mapping.from_vobject(child) + if value is not None: + result[field_id.name] = value + return result + + def to_vobject(self, record): + """Convert Odoo record into vobject representation. + + Automatically adds: + - UID if missing + - REV based on write_date + + :param record: Odoo record + :type record: odoo.models.BaseModel + + :return: vobject instance or None for unsupported types + :rtype: Optional[Any] + """ + self.ensure_one() + + if self.dav_type == "calendar": + result = vobject.iCalendar() + vobj = result.add("vevent") + elif self.dav_type == "addressbook": + result = vobject.vCard() + vobj = result + else: + return None + + for mapping in self.field_mapping_ids: + value = mapping.to_vobject(record) + if value is None or value is False: + continue + if isinstance(value, bool): + continue + if isinstance(value, (int | float)): + value = str(value) + vobj.add(mapping.name).value = value + + if "uid" not in vobj.contents: + vobj.add("uid").value = self._get_record_uid_value(record) + + if ( + "rev" not in vobj.contents + and "write_date" in record._fields + and record.write_date + ): + s = fields.Datetime.to_string(record.write_date) # YYYY-MM-DD HH:MM:SS + vobj.add("rev").value = ( + s.replace("-", "").replace(" ", "T").replace(":", "") + "Z" + ) + + return result + + @api.model + def _odoo_to_http_datetime(self, value): + """Convert Odoo datetime to HTTP-date format (RFC 7231). + + :param value: Datetime value (string or datetime) + :type value: Any + + :return: HTTP formatted datetime string or None + :rtype: Optional[str] + """ + if not value: + return None + if not isinstance(value, str): + value = fields.Datetime.to_string(value) + return time.strftime( + "%a, %d %b %Y %H:%M:%S GMT", + time.strptime(value, "%Y-%m-%d %H:%M:%S"), + ) + + @api.model + def _split_path(self, path): + """Split DAV path into normalized components. + + :param path: Raw path string + :type path: Optional[str] + + :return: List of path segments + :rtype: List[str] + """ + return [ + part + for part in posixpath.normpath(f"/{path or ''}").strip("/").split("/") + if part + ] + + def dav_list( + self, + collection, + path_components, + ): + """List DAV resources under given path. + + Handles: + - file collections (attachments) + - record-based collections (calendar/addressbook) + + :param collection: Radicale collection instance + :type collection: Any + :param path_components: Parsed DAV path + :type path_components: Sequence[str] + + :return: List of resource href paths + :rtype: List[str] + """ + self.ensure_one() + + if self.dav_type == "files": + if len(path_components) == 3: + model_name = self.sudo().model_id.model + collection_model = self.env[model_name] + folder_name = unquote_plus(path_components[2]) + record = collection_model.browse( + map( + itemgetter(0), + collection_model.name_search( + folder_name, + operator="=", + limit=1, + ), + ) + ) + return [ + "/" + + "/".join((*path_components, quote_plus(attachment.name or ""))) + for attachment in self.env["ir.attachment"].search( + [ + ("type", "=", "binary"), + ("res_model", "=", record._name), + ("res_id", "=", record.id), + ] + ) + ] + elif len(path_components) == 2: + return [ + "/" + "/".join((*path_components, quote_plus(record.display_name))) + for record in self.eval_domain_records() + ] + + if len(path_components) > 2: + return [] + + result = [] + for record in self.eval_domain_records(): + result.append( + "/" + "/".join((*path_components, self._get_record_uid_value(record))) + ) + return result + + def dav_delete( + self, + collection, + href, + ): + """Delete DAV resource by href. + + :param collection: Radicale collection instance + :type collection: Any + :param href: Resource path + :type href: str + """ + self.ensure_one() + + if self.dav_type == "files": + # TODO: Handle deletion of attachments + return + + components = self._split_path(href) + rec = self.get_record(components) + if rec: + rec.unlink() + + def dav_upload(self, collection, href, item): + """Create or update DAV resource from uploaded vobject. + + :param collection: Radicale collection instance + :type collection: Any + :param href: Resource path + :type href: str + :param item: Uploaded vobject + :type item: Any + :raises AccessError: If created/updated record is outside collection domain + :return: Radicale Item instance or None + :rtype: Optional[Any] + """ + self.ensure_one() + + if self.dav_type == "files": + # TODO: Handle upload of attachments + return None + + components = self._split_path(href) + model_name = self.sudo().model_id.model + collection_model = self.env[model_name] + + data = self.from_vobject(item) + if not data: + return None + + rec = self.get_record(components) + if not rec: + field_uuid = self.sudo().field_uuid + if field_uuid: + clean_key = _dav_strip_item_extension( + components[-1] if components else "" + ) + if field_uuid.ttype in ("integer", "many2one"): + try: + clean_key = int(clean_key) + except (TypeError, ValueError): + clean_key = None + if clean_key is not None and field_uuid.name not in data: + data[field_uuid.name] = clean_key + + rec = collection_model.create(data) + else: + rec.write(data) + + domain = expression.AND([self._eval_domain(), [("id", "=", rec.id)]]) + if not collection_model.search(domain, limit=1): + raise AccessError(self.env._("Record is outside of DAV collection domain")) + + from ..radicale.collection import Item as DavItem + + return DavItem( + collection, + item=self.to_vobject(rec), + href=href, + last_modified=self._odoo_to_http_datetime(rec.write_date), + ) + + def dav_get(self, collection, href): + """Retrieve DAV resource. + + Supports: + - Folder access (files) + - Attachment download + - Calendar/addressbook items + + :param collection: Radicale collection instance + :type collection: Any + :param href: Resource path + :type href: str + + :return: Radicale Item/FileItem/Collection or None + :rtype: Optional[Any] + """ + self.ensure_one() + + components = self._split_path(href) + model_name = self.sudo().model_id.model + collection_model = self.env[model_name] + if self.dav_type == "files": + if len(components) == 3: + from ..radicale.collection import Collection as DavFolder + + folder = DavFolder(href) + return folder + + if len(components) == 4: + folder_name = unquote_plus(components[2]) + record = collection_model.browse( + map( + itemgetter(0), + collection_model.name_search( + folder_name, + operator="=", + limit=1, + ), + ) + ) + att_name = unquote_plus(components[3]) + attachment = self.env["ir.attachment"].search( + [ + ("type", "=", "binary"), + ("res_model", "=", record._name), + ("res_id", "=", record.id), + ("name", "=", att_name), + ], + limit=1, + ) + if not attachment: + return None + + from ..radicale.collection import FileItem + + return FileItem( + collection, + href, + attachment, + last_modified=self._odoo_to_http_datetime(record.write_date), + ) + + record = self.get_record(components) + + if not record: + return None + + from ..radicale.collection import Item as DavItem + + return DavItem( + collection, + item=self.to_vobject(record), + href=href, + last_modified=self._odoo_to_http_datetime(record.write_date), + ) diff --git a/base_dav/models/dav_collection_field_mapping.py b/base_dav/models/dav_collection_field_mapping.py new file mode 100644 index 000000000..789c8529e --- /dev/null +++ b/base_dav/models/dav_collection_field_mapping.py @@ -0,0 +1,349 @@ +# Copyright 2019 Therp BV +# Copyright 2019-2020 initOS GmbH +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +import base64 +import binascii +import datetime + +import vobject +from dateutil import tz + +from odoo import api, fields, models, tools +from odoo.tools import safe_eval as safe_eval_mod + + +class DavCollectionFieldMapping(models.Model): + _name = "dav.collection.field_mapping" + _description = "A field mapping for a WebDAV collection" + + collection_id = fields.Many2one( + comodel_name="dav.collection", + required=True, + ondelete="cascade", + ) + name = fields.Char( + required=True, + help="Attribute name in the vobject", + ) + mapping_type = fields.Selection( + [ + ("simple", "Simple"), + ("code", "Code"), + ], + default="simple", + required=True, + ) + field_id = fields.Many2one( + comodel_name="ir.model.fields", + required=True, + ondelete="cascade", + help="Field of the model the values are mapped to", + ) + model_id = fields.Many2one( + comodel_name="ir.model", + related="collection_id.model_id", + ) + import_code = fields.Text( + help="Code to import the value from a vobject. Use the variable " + "result for the output of the value and item as input" + ) + export_code = fields.Text( + help="Code to export the value to a vobject. Use the variable " + "result for the output of the value and record as input" + ) + + def _get_safe_eval_context(self, **extra): + """Return safe_eval context for custom DAV mapping code.""" + return { + "datetime": safe_eval_mod.datetime, + "dateutil": safe_eval_mod.dateutil, + "vobject": safe_eval_mod.wrap_module( + vobject, + { + "vCard": {}, + "iCalendar": {}, + "vcard": {"Name": {}}, + "base": {}, + }, + ), + "DEFAULT_SERVER_DATE_FORMAT": tools.DEFAULT_SERVER_DATE_FORMAT, + "DEFAULT_SERVER_DATETIME_FORMAT": tools.DEFAULT_SERVER_DATETIME_FORMAT, + **extra, + } + + def from_vobject(self, child): + """Convert vobject child element into Odoo field value. + + Delegates conversion depending on mapping type: + - simple + - code + + :param child: vobject child element + :type child: Any + + :return: Converted field value + :rtype: Any + """ + self.ensure_one() + if self.mapping_type == "code": + return self._from_vobject_code(child) + return self._from_vobject_simple(child) + + def _from_vobject_code(self, child): + """Convert vobject child using custom Python code. + + Executes safe_eval code stored in ``import_code``. + The following variables are available: + - item + - result + - datetime + - dateutil + - vobject + + :param child: vobject child element + :type child: Any + + :return: Converted value stored in ``result`` + :rtype: Any + """ + self.ensure_one() + context = self._get_safe_eval_context(item=child, result=None) + safe_eval_mod.safe_eval( + self.import_code or "", context, mode="exec", nocopy=True + ) + return context.get("result") + + def _from_vobject_simple(self, child): + """Convert vobject child using automatic type-based mapping. + + :param child: vobject child element + :type child: Any + + :return: Converted value + :rtype: Any + """ + self.ensure_one() + field = self.sudo().field_id + name = (self.name or "").lower() + method_names = ( + f"_from_vobject_{field.ttype}_{name}", + f"_from_vobject_{field.ttype}", + ) + + for method_name in method_names: + method = getattr(self, method_name, None) + if method: + return method(child) + + return getattr(child, "value", None) + + @api.model + def _from_vobject_datetime(self, item): + """Convert vobject datetime into Odoo datetime string (UTC). + + :param item: vobject datetime property + :type item: Any + + :return: Datetime string in server format or None + :rtype: Optional[str] + """ + if isinstance(item.value, datetime.datetime): + value = item.value.astimezone(tz.UTC) + return value.strftime(tools.DEFAULT_SERVER_DATETIME_FORMAT) + if isinstance(item.value, datetime.date): + return item.value.strftime(tools.DEFAULT_SERVER_DATETIME_FORMAT) + return None + + @api.model + def _from_vobject_date(self, item): + """Convert vobject date into Odoo date string. + + :param item: vobject date property + :type item: Any + + :return: Date string in server format or None + :rtype: Optional[str] + """ + if isinstance(item.value, datetime.datetime): + value = item.value.astimezone(tz.UTC) + return value.strftime(tools.DEFAULT_SERVER_DATE_FORMAT) + if isinstance(item.value, datetime.date): + return item.value.strftime(tools.DEFAULT_SERVER_DATE_FORMAT) + return None + + @api.model + def _from_vobject_binary(self, item): + """Convert vobject binary value into ASCII-encoded bytes. + + :param item: vobject binary property + :type item: Any + + :return: ASCII-encoded bytes + :rtype: bytes + """ + value = getattr(item, "value", None) + if not value: + return None + if isinstance(value, str): + raw = value.encode("ascii", errors="ignore") + elif isinstance(value, bytes): + raw = value + else: + raw = bytes(value) + if raw.startswith( + (b"\xff\xd8\xff", b"\x89PNG\r\n\x1a\n", b"GIF87a", b"GIF89a", b"BM") + ): + return base64.b64encode(raw) + compact = b"".join(raw.split()) + try: + decoded = base64.b64decode(compact, validate=True) + return base64.b64encode(decoded) + except (binascii.Error, ValueError): + return base64.b64encode(raw) + + @api.model + def _from_vobject_char_n(self, item): + """Extract family name from vCard Name object. + + :param item: vobject Name property + :type item: Any + + :return: Family name or None + :rtype: Optional[str] + """ + value = getattr(item, "value", None) + if hasattr(value, "family"): + return value.family + if isinstance(value, str): + return value.split(";", 1)[0] or None + return None + + def to_vobject(self, record): + """Convert Odoo record value into vobject-compatible value. + + Delegates conversion depending on mapping type. + Ensures timezone awareness for datetime values. + + :param record: Odoo record + :type record: odoo.models.BaseModel + + :return: Value suitable for vobject property + :rtype: Any + """ + self.ensure_one() + if self.mapping_type == "code": + result = self._to_vobject_code(record) + else: + result = self._to_vobject_simple(record) + + if isinstance(result, datetime.datetime) and not result.tzinfo: + return result.replace(tzinfo=tz.UTC) + return result + + def _to_vobject_code(self, record): + """Convert Odoo record value using custom export code. + + Executes safe_eval code stored in ``export_code``. + + :param record: Odoo record + :type record: odoo.models.BaseModel + + :return: Converted value stored in ``result`` + :rtype: Any + """ + self.ensure_one() + context = self._get_safe_eval_context(record=record, result=None) + safe_eval_mod.safe_eval( + self.export_code or "", context, mode="exec", nocopy=True + ) + return context.get("result") + + def _to_vobject_simple(self, record): + """Convert Odoo field value using automatic type-based mapping. + + :param record: Odoo record + :type record: odoo.models.BaseModel + + :return: Converted value + :rtype: Any + """ + self.ensure_one() + field = self.sudo().field_id + value = record[field.name] + method_names = ( + f"_to_vobject_{field.ttype}_{(self.name or '').lower()}", + f"_to_vobject_{field.ttype}", + ) + + for method_name in method_names: + method = getattr(self, method_name, None) + if method: + return method(value) + + return None if value is False else value + + @api.model + def _to_vobject_datetime(self, value): + """Convert Odoo datetime value into UTC datetime object. + + :param value: Odoo datetime value + :type value: Any + + :return: Timezone-aware datetime or None + :rtype: Optional[datetime.datetime] + """ + dt = fields.Datetime.to_datetime(value) + return dt.replace(tzinfo=tz.UTC) if dt else None + + @api.model + def _to_vobject_datetime_rev(self, value): + """Convert Odoo datetime into REV string format (RFC-style). + + :param value: Odoo datetime value + :type value: Any + + :return: REV formatted string or None + :rtype: Optional[str] + """ + s = fields.Datetime.to_string(value) if value else None + return s and s.replace("-", "").replace(" ", "T").replace(":", "") + "Z" + + @api.model + def _to_vobject_date(self, value): + """Convert Odoo date value into date object. + + :param value: Odoo date value + :type value: Any + + :return: Date object or None + :rtype: Optional[datetime.date] + """ + return fields.Date.to_date(value) + + @api.model + def _to_vobject_binary(self, value): + """Convert binary field value into ASCII string. + + :param value: Binary value + :type value: Optional[bytes] + + :return: ASCII-decoded string or None + :rtype: Optional[str] + """ + return value and value.decode("ascii") + + @api.model + def _to_vobject_char_n(self, value): + """Convert family name into vCard Name object. + + :param value: Family name + :type value: Optional[str] + + :return: vobject.vcard.Name instance + :rtype: Any + """ + if not value: + return None + # TODO: how are we going to handle compound types like this? + return vobject.vcard.Name(family=value) diff --git a/base_dav/pyproject.toml b/base_dav/pyproject.toml new file mode 100644 index 000000000..4231d0ccc --- /dev/null +++ b/base_dav/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["whool"] +build-backend = "whool.buildapi" diff --git a/base_dav/radicale/__init__.py b/base_dav/radicale/__init__.py new file mode 100644 index 000000000..dd8d0f16c --- /dev/null +++ b/base_dav/radicale/__init__.py @@ -0,0 +1,5 @@ +# Copyright 2018 Therp BV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). +from . import auth +from . import collection +from . import rights diff --git a/base_dav/radicale/auth.py b/base_dav/radicale/auth.py new file mode 100644 index 000000000..06511c50a --- /dev/null +++ b/base_dav/radicale/auth.py @@ -0,0 +1,36 @@ +# Copyright 2018 Therp BV +# Copyright 2019-2020 initOS GmbH +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). +import logging + +from radicale.auth import BaseAuth + +from odoo.http import request + +_logger = logging.getLogger(__name__) + + +class Auth(BaseAuth): + def _login_ext(self, login, password, context): + """Authenticate DAV user with an Odoo API key.""" + del context + + if not login or not password: + return "" + + env = request.env + uid = env["res.users.apikeys"]._check_credentials( + scope="dav", + key=password, + ) + if not uid: + _logger.info(f"DAV login failed for {login}") + return "" + + user = env["res.users"].sudo().browse(uid) + if not user.exists() or user.login != login: + _logger.info(f"DAV login failed for {login}") + return "" + + request.update_env(user=uid) + return user.login diff --git a/base_dav/radicale/collection.py b/base_dav/radicale/collection.py new file mode 100644 index 000000000..f6364a6ac --- /dev/null +++ b/base_dav/radicale/collection.py @@ -0,0 +1,387 @@ +# Copyright 2018 Therp BV +# Copyright 2019-2020 initOS GmbH +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +import base64 +import binascii +import contextlib + +from radicale import pathutils +from radicale.item import Item as RadicaleItem +from radicale.storage import BaseCollection, BaseStorage + +from odoo.http import request + + +def _norm_path(path): + """Return sanitized Radicale path without leading slash. + + :param path: Raw path string + :type path: str + + :return: Normalized path without leading slash + :rtype: str + """ + return pathutils.strip_path(pathutils.sanitize_path(path or "")) + + +def _path_prefix(collection_path): + prefix = _norm_path(collection_path) + return f"{prefix}/" if prefix else "" + + +def _abs_href(collection_path, href): + """Build absolute href for Radicale item. + + :param collection_path: Collection base path + :type collection_path: str + :param href: Relative or raw href + :type href: str + + :return: Absolute href + :rtype: str + """ + href = _norm_path(href) + prefix = _path_prefix(collection_path) + if prefix and not href.startswith(prefix): + href = f"{prefix}{href}" + return f"/{href}" + + +def _rel_href(collection_path, href): + """Convert absolute href to relative href. + + Removes collection prefix if present. + + :param collection_path: Collection base path + :type collection_path: str + :param href: Absolute href + :type href: str + + :return: Relative href + :rtype: str + """ + href = _norm_path(href) + prefix = _path_prefix(collection_path) + if prefix and href.startswith(prefix): + return href[len(prefix) :] + return href + + +class Item(RadicaleItem): + def __init__(self, collection, item, href, last_modified): + """Initialize Radicale item wrapper for Odoo DAV. + + :param collection: DAV collection instance + :type collection: Collection + :param item: vobject instance + :type item: Any + :param href: Item href + :type href: str + :param last_modified: HTTP formatted datetime string + :type last_modified: str + """ + super().__init__( + collection=collection, + vobject_item=item, + href=_rel_href(collection.path, href), + last_modified=last_modified or "", + ) + + +class FileItem(RadicaleItem): + def __init__(self, collection, href, attachment, last_modified): + """Initialize Radicale file item from Odoo attachment. + + Decodes binary attachment data into UTF-8 text. + + :param collection: DAV collection instance + :type collection: Collection + :param href: File href + :type href: str + :param attachment: ir.attachment record + :type attachment: Any + :param last_modified: HTTP formatted datetime, defaults to "" + :type last_modified: str, optional + """ + raw = b"" + datas = getattr(attachment, "datas", None) + if datas: + try: + raw = base64.b64decode(datas) + except (binascii.Error, ValueError, TypeError): + raw = b"" + text = raw.decode("utf-8", errors="ignore") + + super().__init__( + collection=collection, + text=text, + href=_rel_href(collection.path, href), + last_modified=last_modified or "", + ) + + +class Collection(BaseCollection): + """ + Path forms: + - root: "" or "/" + - principal: "admin" + - odoo collection: "admin/" + - item: "admin//" + """ + + def __init__(self, path): + """Initialize DAV collection from Radicale path. + + Supports: + - root + - principal + - Odoo collection + - collection item + + :param path: Radicale collection path + :type path: str + """ + self._path = _norm_path(path) + self.path_components = tuple(self._path.split("/", 2)) if self._path else () + + env = request.env + self._record = None + if len(self.path_components) > 1 and (self.path_components[1] or "").isdigit(): + rec = env["dav.collection"].browse(int(self.path_components[1])).exists() + self._record = rec or None + + def set_meta(self, props): + record = self._require_record() + vals = {} + + displayname = props.get("D:displayname") + if displayname is not None: + vals["name"] = displayname + + if record.tag == "VCALENDAR": + color = props.get("ICAL:calendar-color") + if color is not None: + vals["calendar_color"] = color + + if vals: + record.write(vals) + + def _get_metadata(self): + if not self._record: + return {} + + metadata = { + "tag": self._record.tag, + "D:displayname": self._record.display_name, + } + + if self._record.tag == "VCALENDAR": + metadata["C:supported-calendar-component-set"] = "VEVENT" + metadata["ICAL:calendar-color"] = self._record.calendar_color + + return metadata + + def _require_record(self): + if not self._record: + raise ValueError(f"Not a DAV collection: {self.path!r}") + return self._record + + @property + def path(self): + """Return normalized collection path. + + :return: Collection path + :rtype: str + """ + return self._path + + def get_multi(self, hrefs): + """Retrieve multiple items by href. + + :param hrefs: Iterable of href strings + :type hrefs: Iterable[str] + + :return: Iterator of (href, item) + :rtype: Iterator[Tuple[str, Optional[Any]]] + """ + for href in hrefs: + yield href, self.get(href) + + def get_all(self): + """Yield all items in collection. + + :return: Iterator of Radicale items + :rtype: Iterator[Any] + """ + for href in self.list(): + item = self.get(href) + if item is not None: + yield item + + def list(self): + """Return relative hrefs for items or child collections. + + :return: Iterable of relative hrefs + :rtype: Iterable[str] + """ + if self._record: + for href in self._record.dav_list(self, self.path_components): + yield _rel_href(self.path, href) + return + + if self.is_principal: + for record in request.env["dav.collection"].search([]): + yield f"{self.owner}/{record.id}" + return + + login = request.env.user.login + if login: + yield login + + def get(self, href): + """Retrieve single DAV item by href. + + :param href: Relative href + :type href: str + + :return: Radicale item or None + :rtype: Optional[Any] + """ + if not self._record: + return None + return self._record.dav_get(self, _abs_href(self.path, href)) + + def upload(self, href, item, **kwargs): + """Upload or update DAV item. + + :param href: Relative href + :type href: str + :param item: RadicaleItem or vobject + :type item: Any + :param kwargs: Additional Radicale parameters + :type kwargs: Any + + :return: Tuple of (uploaded_item, previous_item) + :rtype: Tuple[Optional[Any], Optional[Any]] + """ + del kwargs + record = self._require_record() + old_item = self.get(href) + vobject_item = getattr(item, "vobject_item", item) + uploaded = record.dav_upload( + self, + _abs_href(self.path, href), + vobject_item, + ) + return uploaded, old_item + + def delete(self, href: str | None = None): + """Delete DAV item. + + :param href: Relative href, defaults to None + :type href: Optional[str], optional + + :raises ValueError: If not a DAV collection + :raises NotImplementedError: If deleting collection root + """ + record = self._require_record() + if not href: + raise NotImplementedError("Deleting collections is not supported") + record.dav_delete(self, _abs_href(self.path, href)) + + def get_meta(self, key=None): + metadata = self._get_metadata() + return metadata if key is None else metadata.get(key) + + @property + def last_modified(self): + """Return HTTP last modified timestamp for collection. + + :return: HTTP datetime string + :rtype: str + """ + if not self._record: + return "" + dt_value = self._record.write_date or self._record.create_date + return self._record._odoo_to_http_datetime(dt_value) or "" + + +class Storage(BaseStorage): + def discover( + self, + path, + depth="0", + child_context_manager=None, + user_groups=None, + ): + """Discover collections or items for given path. + + Supports depth 0 and depth > 0 discovery. + + :param path: Radicale path + :type path: str + :param depth: Discovery depth, defaults to "0" + :type depth: str, optional + :param child_context_manager: Optional Radicale context manager + :type child_context_manager: Optional[Callable] + :param user_groups: Optional set of user groups + :type user_groups: Optional[set[str]] + + :return: Iterator of collections or items + :rtype: Iterator[Any] + """ + del child_context_manager, user_groups + + path = _norm_path(path) + parts = path.split("/", 2) if path else [] + + if len(parts) == 3 and parts[1].isdigit(): + collection = Collection("/".join(parts[:2])) + if not collection._record: + return iter(()) + item = collection.get(parts[2]) + return iter([item]) if item else iter(()) + + collection = Collection(path) + if depth == "0": + return iter([collection]) + + children = [collection] + if collection._record: + children.extend(collection.get_all()) + else: + children.extend(Collection(child_path) for child_path in collection.list()) + + return iter(children) + + def move(self, item, to_collection, to_href): + """Move DAV item between collections. + + :raises NotImplementedError: MOVE is not supported + """ + raise NotImplementedError("MOVE is not supported by Odoo DAV backend") + + def create_collection(self, href, items=None, props=None): + """Create DAV collection. + + :raises NotImplementedError: MKCOL not supported + """ + raise NotImplementedError("MKCOL is not supported by Odoo DAV backend") + + @contextlib.contextmanager + def acquire_lock(self, mode, user="", *args, **kwargs): + """Acquire storage lock. + + :yield: None + """ + del mode, user, args, kwargs + yield + + def verify(self): + """Verify storage backend integrity. + + :return: Verification status + :rtype: bool + """ + return True diff --git a/base_dav/radicale/rights.py b/base_dav/radicale/rights.py new file mode 100644 index 000000000..417fb5a9b --- /dev/null +++ b/base_dav/radicale/rights.py @@ -0,0 +1,58 @@ +# Copyright 2018 Therp BV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + + +from radicale.rights import BaseRights + +from odoo.http import request + + +class Rights(BaseRights): + def authorization(self, user, path): + """Determine access rights for DAV resource. + + Authorization logic: + - Root path: full access + - Principal path: requires authentication + - Collection path: depends on collection.rights + (authenticated / owner_only / owner_write_only) + + :param user: Authenticated username or None + :type user: str | None + :param path: Requested DAV path + :type path: str | None + + :return: Access mode string. + - "R" means read access. + - "W" means write access. + - Uppercase letters apply to collections. + - Lowercase letters apply to items. + - Empty string means no access. + :rtype: str + """ + if not path or path == "/": + return "RWrw" if user else "" + + parts = [part for part in (path or "").strip("/").split("/") if part] + + if len(parts) == 1: + return "RWrw" if user and user == parts[0] else "" + + if len(parts) < 2 or not parts[1].isdigit(): + return "" + + collection = request.env["dav.collection"].sudo().browse(int(parts[1])) + if not collection.exists(): + return "" + + mode = collection.rights + is_owner = bool(user) and user == parts[0] + + if mode == "authenticated": + return "RWrw" if user else "" + if mode == "owner_only": + return "RWrw" if is_owner else "" + if mode == "owner_write_only": + return "RWrw" if is_owner else "Rr" if user else "" + + return "" diff --git a/base_dav/readme/CONFIGURE.md b/base_dav/readme/CONFIGURE.md new file mode 100644 index 000000000..e79d6f7d4 --- /dev/null +++ b/base_dav/readme/CONFIGURE.md @@ -0,0 +1,8 @@ +To configure this module, you need to: + +1. Go to Settings / WebDAV Collections and create or edit your + collections. There, you'll also see the URL to point your clients + to. + +Note that you need to configure a dbfilter if you use multiple +databases. diff --git a/base_dav/readme/CONTRIBUTORS.md b/base_dav/readme/CONTRIBUTORS.md new file mode 100644 index 000000000..34c5befd1 --- /dev/null +++ b/base_dav/readme/CONTRIBUTORS.md @@ -0,0 +1,6 @@ +- Holger Brunn \ +- Florian Kantelberg \ +- [Cetmix](https://cetmix.com/) + - Ivan Sokolov + - George Smirnov + - Dmitry Meita diff --git a/base_dav/readme/CREDITS.md b/base_dav/readme/CREDITS.md new file mode 100644 index 000000000..57e07d3e8 --- /dev/null +++ b/base_dav/readme/CREDITS.md @@ -0,0 +1,2 @@ +* Odoo Community Association +* All the actual work is done by [Radicale](https://radicale.org) diff --git a/base_dav/readme/DESCRIPTION.md b/base_dav/readme/DESCRIPTION.md new file mode 100644 index 000000000..3615c5ea7 --- /dev/null +++ b/base_dav/readme/DESCRIPTION.md @@ -0,0 +1,6 @@ +This module adds WebDAV support to Odoo, specifically CalDAV and +CardDAV. + +You can configure arbitrary objects as a calendar or an address book, +thus make arbitrary information accessible in external systems or your +mobile. diff --git a/base_dav/readme/ROADMAP.md b/base_dav/readme/ROADMAP.md new file mode 100644 index 000000000..cec5d754f --- /dev/null +++ b/base_dav/readme/ROADMAP.md @@ -0,0 +1,11 @@ +- Much better UX for configuring collections (probably provide a group + that sees the current fully flexible field mappings, and by default + show some dumbed down version where you can select some preselected + vobject fields); +- Support todo lists and journals; +- Support configuring default field mappings per model; +- Support plain WebDAV collections to make some model's records + accessible as folders, and the records' attachments as files (r/w); +- Support configuring lists of calendars so that you can have a calendar + for every project and appointments are tasks, or a calendar for every + sales team and appointments are sale orders. Lots of possibilities. diff --git a/base_dav/security/ir.model.access.csv b/base_dav/security/ir.model.access.csv new file mode 100644 index 000000000..fe701f5dc --- /dev/null +++ b/base_dav/security/ir.model.access.csv @@ -0,0 +1,5 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +access_dav_collection_user,dav.collection user,model_dav_collection,base.group_user,1,0,0,0 +access_dav_collection_system,dav.collection system,model_dav_collection,base.group_system,1,1,1,1 +access_dav_collection_field_mapping_user,dav.collection.field_mapping user,model_dav_collection_field_mapping,base.group_user,1,0,0,0 +access_dav_collection_field_mapping_system,dav.collection.field_mapping system,model_dav_collection_field_mapping,base.group_system,1,1,1,1 diff --git a/base_dav/static/description/icon.png b/base_dav/static/description/icon.png new file mode 100644 index 000000000..f0eebfd17 Binary files /dev/null and b/base_dav/static/description/icon.png differ diff --git a/base_dav/static/description/index.html b/base_dav/static/description/index.html new file mode 100644 index 000000000..002410213 --- /dev/null +++ b/base_dav/static/description/index.html @@ -0,0 +1,473 @@ + + + + + +Caldav and Carddav support + + + +
+

Caldav and Carddav support

+ + +

Beta License: AGPL-3 OCA/server-backend Translate me on Weblate Try me on Runboat

+

This module adds WebDAV support to Odoo, specifically CalDAV and +CardDAV.

+

You can configure arbitrary objects as a calendar or an address book, +thus make arbitrary information accessible in external systems or your +mobile.

+

Table of contents

+ +
+

Configuration

+

To configure this module, you need to:

+
    +
  1. Go to Settings / WebDAV Collections and create or edit your +collections. There, you’ll also see the URL to point your clients to.
  2. +
+

Note that you need to configure a dbfilter if you use multiple +databases.

+
+
+

Known issues / Roadmap

+
    +
  • Much better UX for configuring collections (probably provide a group +that sees the current fully flexible field mappings, and by default +show some dumbed down version where you can select some preselected +vobject fields);
  • +
  • Support todo lists and journals;
  • +
  • Support configuring default field mappings per model;
  • +
  • Support plain WebDAV collections to make some model’s records +accessible as folders, and the records’ attachments as files (r/w);
  • +
  • Support configuring lists of calendars so that you can have a calendar +for every project and appointments are tasks, or a calendar for every +sales team and appointments are sale orders. Lots of possibilities.
  • +
+
+
+

Bug Tracker

+

Bugs are tracked on GitHub Issues. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +feedback.

+

Do not contact contributors directly about support or help with technical issues.

+
+
+

Credits

+
+

Authors

+
    +
  • initOS GmbH
  • +
  • Therp BV
  • +
+
+
+

Contributors

+ +
+
+

Other credits

+
    +
  • Odoo Community Association
  • +
  • All the actual work is done by Radicale
  • +
+
+
+

Maintainers

+

This module is maintained by the OCA.

+ +Odoo Community Association + +

OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use.

+

Current maintainer:

+

hbrunn

+

This module is part of the OCA/server-backend project on GitHub.

+

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

+
+
+
+ + diff --git a/base_dav/tests/__init__.py b/base_dav/tests/__init__.py new file mode 100644 index 000000000..e1482f6fa --- /dev/null +++ b/base_dav/tests/__init__.py @@ -0,0 +1,11 @@ +# Copyright 2018 Therp BV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from . import test_auth +from . import test_base_dav +from . import test_collection +from . import test_controller +from . import test_field_mapping +from . import test_vcard_false_values +from . import test_radicale_collection +from . import test_dav_collection_files diff --git a/base_dav/tests/common.py b/base_dav/tests/common.py new file mode 100644 index 000000000..196a1be5a --- /dev/null +++ b/base_dav/tests/common.py @@ -0,0 +1,258 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +import base64 +from contextlib import contextmanager +from types import SimpleNamespace +from urllib.parse import quote_plus + +import odoo.http as http +from odoo.tests.common import TransactionCase + +from ..radicale.collection import Collection + + +class BaseDavTestCase(TransactionCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True)) + + @classmethod + def create_user(cls, login, *, name=None, groups=None): + groups = groups or ["base.group_user"] + return ( + cls.env["res.users"] + .with_context(no_reset_password=True) + .create( + { + "login": login, + "name": name or login, + "groups_id": [(6, 0, [cls.env.ref(xmlid).id for xmlid in groups])], + } + ) + ) + + @classmethod + def create_partner(cls, *, name, email=False, **extra_vals): + vals = { + "name": name, + "email": email, + **extra_vals, + } + return cls.env["res.partner"].create(vals) + + @classmethod + def create_collection( + cls, + *, + name, + dav_type, + model_xmlid, + domain="[]", + rights="owner_only", + field_uuid_xmlid=None, + ): + vals = { + "name": name, + "dav_type": dav_type, + "model_id": cls.env.ref(model_xmlid).id, + "domain": domain, + "rights": rights, + } + if field_uuid_xmlid: + vals["field_uuid"] = cls.env.ref(field_uuid_xmlid).id + return cls.env["dav.collection"].create(vals) + + @classmethod + def add_mapping( + cls, + collection, + *, + name, + field_xmlid, + import_code=None, + export_code=None, + ): + return cls.env["dav.collection.field_mapping"].create( + { + "collection_id": collection.id, + "name": name, + "field_id": cls.env.ref(field_xmlid).id, + "mapping_type": "code" if (import_code or export_code) else "simple", + "import_code": import_code, + "export_code": export_code, + } + ) + + @classmethod + def create_attachment( + cls, + *, + record, + name, + raw=b"", + mimetype="application/octet-stream", + ): + return cls.env["ir.attachment"].create( + { + "name": name, + "type": "binary", + "datas": base64.b64encode(raw).decode(), + "res_model": record._name, + "res_id": record.id, + "mimetype": mimetype, + } + ) + + @classmethod + def create_partner_addressbook_fixture( + cls, + *, + partner=None, + name="Contacts", + rights="owner_only", + domain=None, + with_name=True, + with_email=True, + ): + partner = partner or cls.create_partner( + name="DAV Partner", + email="dav@example.com", + ) + domain = domain or f"[('id', '=', {partner.id})]" + collection = cls.create_collection( + name=name, + dav_type="addressbook", + model_xmlid="base.model_res_partner", + domain=domain, + rights=rights, + ) + + mappings = {} + if with_name: + mappings["name"] = cls.add_mapping( + collection, + name="FN", + field_xmlid="base.field_res_partner__name", + ) + if with_email: + mappings["email"] = cls.add_mapping( + collection, + name="EMAIL", + field_xmlid="base.field_res_partner__email", + ) + + return SimpleNamespace( + partner=partner, + collection=collection, + mappings=mappings, + ) + + @classmethod + def create_users_calendar_fixture( + cls, + *, + record=None, + name="Test Collection", + rights="owner_only", + domain="[]", + ): + collection = cls.create_collection( + name=name, + dav_type="calendar", + model_xmlid="base.model_res_users", + domain=domain, + rights=rights, + ) + login_mapping = cls.add_mapping( + collection, + name="login", + field_xmlid="base.field_res_users__login", + import_code="result = item.value", + export_code="result = record.login", + ) + name_mapping = cls.add_mapping( + collection, + name="name", + field_xmlid="base.field_res_users__name", + ) + record = record or cls.create_user("tester", name="Test User") + + return SimpleNamespace( + record=record, + collection=collection, + mappings={ + "login": login_mapping, + "name": name_mapping, + }, + ) + + @classmethod + def create_files_fixture( + cls, + *, + partner=None, + collection_name="Partner Files", + attachment_name="hello world.txt", + attachment_raw=b"Hello DAV files", + mimetype="text/plain", + ): + partner = partner or cls.create_partner( + name="DAV Files Partner", + email="files@example.com", + ) + collection = cls.create_collection( + name=collection_name, + dav_type="files", + model_xmlid="base.model_res_partner", + domain=f"[('id', '=', {partner.id})]", + ) + attachment = cls.create_attachment( + record=partner, + name=attachment_name, + raw=attachment_raw, + mimetype=mimetype, + ) + return SimpleNamespace( + partner=partner, + collection=collection, + attachment=attachment, + ) + + @contextmanager + def request_context(self, *, env=None, uid=None): + request_obj = SimpleNamespace( + env=env or self.env, + uid=uid or self.env.uid, + ) + http._request_stack.push(request_obj) + try: + yield request_obj + finally: + http._request_stack.pop() + + def push_request_context(self, *, env=None, uid=None): + ctx = self.request_context(env=env, uid=uid) + ctx.__enter__() + self.addCleanup(ctx.__exit__, None, None, None) + + def make_collection(self, collection, *, login=None): + login = login or self.env.user.login + return Collection(f"{login}/{collection.id}") + + @staticmethod + def dav_quote(value): + return quote_plus(value or "") + + def assert_permissions(self, rights_obj, user_login, path, *, can_read, can_write): + perms = rights_obj.authorization(user_login, path) or "" + self.assertEqual( + "r" in perms, + can_read, + f"permissions={perms!r} user={user_login!r} path={path!r}", + ) + self.assertEqual( + "w" in perms, + can_write, + f"permissions={perms!r} user={user_login!r} path={path!r}", + ) diff --git a/base_dav/tests/test_auth.py b/base_dav/tests/test_auth.py new file mode 100644 index 000000000..a44476210 --- /dev/null +++ b/base_dav/tests/test_auth.py @@ -0,0 +1,155 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from types import SimpleNamespace +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.base_dav.radicale import auth as dav_auth_mod +from odoo.addons.base_dav.radicale.auth import Auth + +from .common import BaseDavTestCase + + +@tagged("post_install", "-at_install") +class TestDavAuth(BaseDavTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.user = cls.create_user("dav_tester", name="DAV Tester") + + def _make_request(self, *, api_uid=None, exists=True, login=None): + """Build a mocked request object for Auth._login_ext tests.""" + effective_login = login or self.user.login + user_record = mock.Mock() + user_record.exists.return_value = exists + user_record.login = effective_login + + apikeys_model = mock.Mock() + apikeys_model._check_credentials.return_value = api_uid + + users_model = mock.Mock() + users_model.sudo.return_value.browse.return_value = user_record + + env = mock.MagicMock() + env.__getitem__.side_effect = lambda model_name: { + "res.users.apikeys": apikeys_model, + "res.users": users_model, + }[model_name] + env.user.login = effective_login + + request_obj = SimpleNamespace( + env=env, + update_env=mock.Mock(), + ) + return request_obj, apikeys_model, users_model + + @staticmethod + def _new_auth(): + return object.__new__(Auth) + + def test_login_ext_returns_empty_without_login(self): + auth = self._new_auth() + request_obj, apikeys_model, _users_model = self._make_request() + + with mock.patch.object(dav_auth_mod, "request", request_obj): + result = auth._login_ext("", "secret", SimpleNamespace()) + + self.assertEqual(result, "") + apikeys_model._check_credentials.assert_not_called() + request_obj.update_env.assert_not_called() + + def test_login_ext_returns_empty_without_password(self): + auth = self._new_auth() + request_obj, apikeys_model, _users_model = self._make_request() + + with mock.patch.object(dav_auth_mod, "request", request_obj): + result = auth._login_ext(self.user.login, "", SimpleNamespace()) + + self.assertEqual(result, "") + apikeys_model._check_credentials.assert_not_called() + request_obj.update_env.assert_not_called() + + def test_login_ext_returns_empty_when_key_is_invalid(self): + auth = self._new_auth() + request_obj, apikeys_model, _users_model = self._make_request(api_uid=False) + + with ( + mock.patch.object(dav_auth_mod, "request", request_obj), + mock.patch.object(dav_auth_mod, "_logger") as logger, + ): + result = auth._login_ext(self.user.login, "bad-key", SimpleNamespace()) + + self.assertEqual(result, "") + apikeys_model._check_credentials.assert_called_once_with( + scope="dav", + key="bad-key", + ) + request_obj.update_env.assert_not_called() + logger.info.assert_called_once() + + def test_login_ext_returns_empty_when_user_not_found(self): + auth = self._new_auth() + request_obj, apikeys_model, users_model = self._make_request( + api_uid=self.user.id, + exists=False, + ) + + with ( + mock.patch.object(dav_auth_mod, "request", request_obj), + mock.patch.object(dav_auth_mod, "_logger") as logger, + ): + result = auth._login_ext(self.user.login, "secret", SimpleNamespace()) + + self.assertEqual(result, "") + apikeys_model._check_credentials.assert_called_once_with( + scope="dav", + key="secret", + ) + users_model.sudo.return_value.browse.assert_called_once_with(self.user.id) + request_obj.update_env.assert_not_called() + logger.info.assert_called_once() + + def test_login_ext_returns_empty_when_login_does_not_match_user(self): + auth = self._new_auth() + request_obj, apikeys_model, users_model = self._make_request( + api_uid=self.user.id, + login="another_login", + ) + + with ( + mock.patch.object(dav_auth_mod, "request", request_obj), + mock.patch.object(dav_auth_mod, "_logger") as logger, + ): + result = auth._login_ext(self.user.login, "secret", SimpleNamespace()) + + self.assertEqual(result, "") + apikeys_model._check_credentials.assert_called_once_with( + scope="dav", + key="secret", + ) + users_model.sudo.return_value.browse.assert_called_once_with(self.user.id) + request_obj.update_env.assert_not_called() + logger.info.assert_called_once() + + def test_login_ext_accepts_valid_api_key(self): + auth = self._new_auth() + request_obj, apikeys_model, users_model = self._make_request( + api_uid=self.user.id, + login=self.user.login, + ) + + with mock.patch.object(dav_auth_mod, "request", request_obj): + result = auth._login_ext( + self.user.login, + "secret", + SimpleNamespace(), + ) + + self.assertEqual(result, self.user.login) + apikeys_model._check_credentials.assert_called_once_with( + scope="dav", + key="secret", + ) + users_model.sudo.return_value.browse.assert_called_once_with(self.user.id) + request_obj.update_env.assert_called_once_with(user=self.user.id) diff --git a/base_dav/tests/test_base_dav.py b/base_dav/tests/test_base_dav.py new file mode 100644 index 000000000..d8b45457e --- /dev/null +++ b/base_dav/tests/test_base_dav.py @@ -0,0 +1,249 @@ +# Copyright 2018 Therp BV +# Copyright 2019-2020 initOS GmbH +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from odoo.tests import tagged + +from ..controllers.main import PREFIX +from ..controllers.main import Main as Controller +from ..radicale.rights import Rights +from .common import BaseDavTestCase + + +@tagged("post_install", "-at_install") +class TestBaseDav(BaseDavTestCase): + @classmethod + def setUpClass(cls): + """Prepare shared users, partner, collection and controller.""" + super().setUpClass() + + cls.test_user = cls.create_user("tester", name="tester") + cls.partner = cls.create_partner(name="DAV Partner") + cls.collection = cls.create_collection( + name="Test Collection", + dav_type="calendar", + model_xmlid="base.model_res_partner", + domain=f"[('id', '=', {cls.partner.id})]", + ) + + cls.owner_login = cls.env.user.login + cls.tester_login = cls.test_user.login + + cls.controller = Controller() + + def setUp(self): + """Bind request context required by Rights.authorization().""" + super().setUp() + self.push_request_context() + self.rights = object.__new__(Rights) + + def _collection_paths(self): + """Return base collection path and item path for assertions.""" + base_path = f"/{self.owner_login}/{self.collection.id}" + item_path = f"{base_path}/{self.partner.id}" + return base_path, item_path + + def test_well_known(self): + """Verify well-known DAV endpoint redirects to the DAV prefix.""" + response = self.controller.handle_well_known_request() + + self.assertEqual(response.status_code, 301) + self.assertIn(PREFIX, response.location) + + def test_authenticated(self): + """Verify authenticated rights grant rw to logged users only.""" + self.collection.rights = "authenticated" + base_path, item_path = self._collection_paths() + + self.assert_permissions( + self.rights, + self.owner_login, + base_path, + can_read=True, + can_write=True, + ) + self.assert_permissions( + self.rights, + self.owner_login, + item_path, + can_read=True, + can_write=True, + ) + + self.assert_permissions( + self.rights, + self.tester_login, + base_path, + can_read=True, + can_write=True, + ) + self.assert_permissions( + self.rights, + self.tester_login, + item_path, + can_read=True, + can_write=True, + ) + + self.assert_permissions( + self.rights, + "", + base_path, + can_read=False, + can_write=False, + ) + self.assert_permissions( + self.rights, + "", + item_path, + can_read=False, + can_write=False, + ) + + def test_owner_only(self): + """Verify owner_only rights grant access only to the owner.""" + self.collection.rights = "owner_only" + base_path, item_path = self._collection_paths() + + self.assert_permissions( + self.rights, + self.owner_login, + base_path, + can_read=True, + can_write=True, + ) + self.assert_permissions( + self.rights, + self.owner_login, + item_path, + can_read=True, + can_write=True, + ) + + self.assert_permissions( + self.rights, + self.tester_login, + base_path, + can_read=False, + can_write=False, + ) + self.assert_permissions( + self.rights, + self.tester_login, + item_path, + can_read=False, + can_write=False, + ) + + self.assert_permissions( + self.rights, + "", + base_path, + can_read=False, + can_write=False, + ) + self.assert_permissions( + self.rights, + "", + item_path, + can_read=False, + can_write=False, + ) + + def test_owner_write_only(self): + """Verify owner_write_only grants read-only access to other users.""" + self.collection.rights = "owner_write_only" + base_path, item_path = self._collection_paths() + + self.assert_permissions( + self.rights, + self.owner_login, + base_path, + can_read=True, + can_write=True, + ) + self.assert_permissions( + self.rights, + self.owner_login, + item_path, + can_read=True, + can_write=True, + ) + + self.assert_permissions( + self.rights, + self.tester_login, + base_path, + can_read=True, + can_write=False, + ) + self.assert_permissions( + self.rights, + self.tester_login, + item_path, + can_read=True, + can_write=False, + ) + + self.assert_permissions( + self.rights, + "", + base_path, + can_read=False, + can_write=False, + ) + self.assert_permissions( + self.rights, + "", + item_path, + can_read=False, + can_write=False, + ) + + def test_rights_root_and_principal_and_missing_collection(self): + """Verify root, principal and invalid collection path handling.""" + self.assert_permissions( + self.rights, + self.owner_login, + "/", + can_read=True, + can_write=True, + ) + self.assert_permissions( + self.rights, + "", + "/", + can_read=False, + can_write=False, + ) + + principal_path = f"/{self.owner_login}" + self.assert_permissions( + self.rights, + self.owner_login, + principal_path, + can_read=True, + can_write=True, + ) + self.assert_permissions( + self.rights, + "", + principal_path, + can_read=False, + can_write=False, + ) + + self.assert_permissions( + self.rights, + self.owner_login, + f"/{self.owner_login}/999999", + can_read=False, + can_write=False, + ) + self.assert_permissions( + self.rights, + self.owner_login, + f"/{self.owner_login}/not-a-number", + can_read=False, + can_write=False, + ) diff --git a/base_dav/tests/test_collection.py b/base_dav/tests/test_collection.py new file mode 100644 index 000000000..3d4ec9d40 --- /dev/null +++ b/base_dav/tests/test_collection.py @@ -0,0 +1,199 @@ +# Copyright 2019-2020 initOS GmbH +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from unittest import mock + +from odoo.exceptions import AccessError +from odoo.tests import tagged + +from .common import BaseDavTestCase + + +@tagged("post_install", "-at_install") +class TestCalendar(BaseDavTestCase): + @classmethod + def setUpClass(cls): + """Prepare shared calendar collection, mappings and record.""" + super().setUpClass() + + fixture = cls.create_users_calendar_fixture( + name="Test Collection", + domain="[]", + ) + cls.collection = fixture.collection + cls.record = fixture.record + cls.map_login = fixture.mappings["login"] + cls.map_name = fixture.mappings["name"] + + def setUp(self): + """Push request context required by low-level DAV wrapper tests.""" + super().setUp() + self.push_request_context() + + def _compare_record(self, vobj, rec=None): + """Assert that imported vobject values match the expected record.""" + imported_vals = self.collection.from_vobject(vobj) + expected_record = rec or self.record + self.assertEqual(expected_record.login, imported_vals["login"]) + self.assertEqual(expected_record.name, imported_vals["name"]) + + def test_import_export(self): + """Verify export followed by import preserves record values.""" + vobj = self.collection.to_vobject(self.record) + self._compare_record(vobj) + + def test_get_record(self): + """Verify record lookup by ID and by custom UUID field.""" + record = self.collection.get_record([str(self.record.id)]) + self.assertEqual(record, self.record) + + self.collection.field_uuid = self.env.ref("base.field_res_users__login") + record = self.collection.get_record([self.record.login]) + self.assertEqual(record, self.record) + + def test_collection_helpers_and_negative_paths(self): + self.assertEqual( + self.collection._split_path("/a//b/c/"), + ["a", "b", "c"], + ) + self.assertTrue( + self.collection._odoo_to_http_datetime(self.record.write_date).endswith( + "GMT" + ) + ) + self.assertEqual( + self.collection._compute_url.__name__, + "_compute_url", + ) + + self.collection.field_uuid = self.env.ref("base.field_res_users__login") + self.assertFalse(self.collection.get_record(["bad.ics"])) + self.assertEqual( + self.collection.get_record([f"{self.record.login}.ics"]), + self.record, + ) + + self.collection.dav_type = "files" + self.assertIsNone(self.collection.to_vobject(self.record)) + self.assertIsNone(self.collection.from_vobject(mock.Mock(name="whatever"))) + self.assertIsNone(self.collection.dav_upload(mock.Mock(), "/x", mock.Mock())) + self.assertIsNone(self.collection.dav_delete(mock.Mock(), "/x")) + + def test_compute_tag_and_url_and_datetime_helpers(self): + """Verify tag, URL and datetime helper methods.""" + self.collection.dav_type = "calendar" + self.collection._compute_tag() + self.assertEqual(self.collection.tag, "VCALENDAR") + + self.collection.dav_type = "addressbook" + self.collection._compute_tag() + self.assertEqual(self.collection.tag, "VADDRESSBOOK") + + self.collection.dav_type = "files" + self.collection._compute_tag() + self.assertFalse(self.collection.tag) + + self.env["ir.config_parameter"].sudo().set_param( + "web.base.url", "http://test.local" + ) + self.collection._compute_url() + self.assertEqual( + self.collection.url, + f"http://test.local/.dav/{self.env.user.login}/{self.collection.id}", + ) + + self.env["ir.config_parameter"].sudo().set_param("web.base.url", "") + self.collection._compute_url() + self.assertEqual( + self.collection.url, + f"/.dav/{self.env.user.login}/{self.collection.id}", + ) + + self.assertIsNone(self.collection._odoo_to_http_datetime(False)) + self.assertEqual(self.collection._split_path("/a//b/c/"), ["a", "b", "c"]) + + def test_dav_strip_extension_and_invalid_record_lookup(self): + """Verify DAV href extension stripping and invalid lookup handling.""" + from ..models.dav_collection import _dav_strip_item_extension + + self.assertEqual(_dav_strip_item_extension("abc.vcf"), "abc") + self.assertEqual(_dav_strip_item_extension("abc.ics"), "abc") + self.assertEqual(_dav_strip_item_extension("abc.txt"), "abc.txt") + self.assertEqual(_dav_strip_item_extension(""), "") + + self.assertFalse(self.collection.get_record(["not-an-id"])) + self.collection.field_uuid = self.env.ref("base.field_res_users__login") + self.assertFalse(self.collection.get_record(["unknown-login"])) + self.assertEqual( + self.collection.get_record([f"{self.record.login}.ics"]), + self.record, + ) + + def test_from_vobject_guards_and_eval_context(self): + """Verify unsupported vobject structures and eval helpers.""" + self.assertIn("user", self.collection._eval_context()) + self.assertEqual(self.collection._eval_domain(), []) + + bogus = mock.Mock() + bogus.name = "VCARD" + self.collection.dav_type = "calendar" + self.assertIsNone(self.collection.from_vobject(bogus)) + + bogus.name = "VCALENDAR" + if hasattr(bogus, "vevent"): + del bogus.vevent + self.assertIsNone(self.collection.from_vobject(bogus)) + + self.collection.dav_type = "addressbook" + bogus.name = "VCALENDAR" + self.assertIsNone(self.collection.from_vobject(bogus)) + + self.collection.dav_type = "files" + self.assertIsNone(self.collection.from_vobject(bogus)) + self.assertIsNone(self.collection.to_vobject(self.record)) + + def test_domain_validation_and_access_error_on_upload_outside_domain(self): + """Verify invalid domains fail and upload outside domain is rejected.""" + bad_collection = self.env["dav.collection"].new( + { + "name": "Bad Domain", + "dav_type": "calendar", + "model_id": self.env.ref("base.model_res_users").id, + "domain": "[", + } + ) + with self.assertRaises(SyntaxError): + bad_collection._eval_domain() + + partner = self.create_partner( + name="DAV Restricted Partner", + email="restricted@example.com", + ) + + source_fixture = self.create_partner_addressbook_fixture( + partner=partner, + name="Source Partner Collection", + ) + source_collection = source_fixture.collection + + restricted_collection = self.create_collection( + name="Restricted Partner Collection", + dav_type="addressbook", + model_xmlid="base.model_res_partner", + domain="[('id', '=', -1)]", + ) + self.add_mapping( + restricted_collection, + name="FN", + field_xmlid="base.field_res_partner__name", + ) + self.add_mapping( + restricted_collection, + name="EMAIL", + field_xmlid="base.field_res_partner__email", + ) + + vobj = source_collection.to_vobject(partner) + + with self.assertRaises(AccessError): + restricted_collection.dav_upload(mock.Mock(), "/x", vobj) diff --git a/base_dav/tests/test_controller.py b/base_dav/tests/test_controller.py new file mode 100644 index 000000000..d7617861c --- /dev/null +++ b/base_dav/tests/test_controller.py @@ -0,0 +1,160 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from types import SimpleNamespace +from unittest import mock + +from odoo import http +from odoo.tests import tagged + +from odoo.addons.base_dav.controllers.main import PREFIX, Main + +from .common import BaseDavTestCase + + +class _ClosableResult(list): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.closed = False + + def close(self): + self.closed = True + + +@tagged("post_install", "-at_install") +class TestDavController(BaseDavTestCase): + def setUp(self): + super().setUp() + self.controller = Main() + + def _push_request( + self, + *, + method="PROPFIND", + body=b"", + environ=None, + content_type="application/xml; charset=utf-8", + ): + default_environ = { + "REQUEST_METHOD": method, + "CONTENT_LENGTH": str(len(body)), + "CONTENT_TYPE": content_type, + } + if environ: + default_environ.update(environ) + + httprequest = SimpleNamespace( + environ=default_environ, + method=method, + get_data=lambda cache=False: body, + ) + request_obj = SimpleNamespace( + httprequest=httprequest, + env=self.env, + uid=self.env.uid, + ) + http._request_stack.push(request_obj) + self.addCleanup(http._request_stack.pop) + return request_obj + + @staticmethod + def _make_fake_app(captured, *, status, headers, payload): + def fake_app(environ, start_response): + captured.update(environ) + start_response(status, headers) + return _ClosableResult([payload]) + + return fake_app + + def test_handle_dav_request_preserves_empty_propfind_body(self): + self._push_request(method="PROPFIND", body=b"") + + captured = {} + fake_app = self._make_fake_app( + captured, + status="207 Multi-Status", + headers=[("Content-Type", "application/xml")], + payload=b"", + ) + + with ( + mock.patch( + "odoo.addons.base_dav.controllers.main.radicale_config.load" + ) as load_config, + mock.patch( + "odoo.addons.base_dav.controllers.main._get_radicale_app", + return_value=fake_app, + ), + ): + load_config.return_value = mock.Mock() + response = self.controller.handle_dav_request("demo/path") + + self.assertEqual(response.status_code, 207) + self.assertEqual(captured["SCRIPT_NAME"], PREFIX) + self.assertEqual(captured["HTTP_X_SCRIPT_NAME"], PREFIX) + self.assertEqual(captured["PATH_INFO"], "/demo/path") + self.assertEqual(captured["CONTENT_TYPE"], "application/xml; charset=utf-8") + self.assertEqual(captured["REQUEST_METHOD"], "PROPFIND") + self.assertEqual(captured["wsgi.input"].read(), b"") + self.assertEqual(captured["CONTENT_LENGTH"], "0") + + def test_handle_dav_request_preserves_request_body(self): + body = b"" + self._push_request(method="PROPFIND", body=body) + + captured = {} + fake_app = self._make_fake_app( + captured, + status="207 Multi-Status", + headers=[("Content-Type", "application/xml")], + payload=b"", + ) + + with ( + mock.patch( + "odoo.addons.base_dav.controllers.main.radicale_config.load" + ) as load_config, + mock.patch( + "odoo.addons.base_dav.controllers.main._get_radicale_app", + return_value=fake_app, + ), + ): + load_config.return_value = mock.Mock() + response = self.controller.handle_dav_request("demo/path") + + self.assertEqual(response.status_code, 207) + self.assertEqual(captured["wsgi.input"].read(), body) + self.assertEqual(captured["CONTENT_LENGTH"], str(len(body))) + + def test_handle_dav_request_uses_given_body_and_default_root_path(self): + body = b"payload" + self._push_request( + method="PUT", + body=body, + environ={"REQUEST_METHOD": "PUT"}, + ) + + captured = {} + fake_app = self._make_fake_app( + captured, + status="201 Created", + headers=[("X-Test", "yes")], + payload=b"created", + ) + + with ( + mock.patch( + "odoo.addons.base_dav.controllers.main.radicale_config.load" + ) as load_config, + mock.patch( + "odoo.addons.base_dav.controllers.main._get_radicale_app", + return_value=fake_app, + ), + ): + load_config.return_value = mock.Mock() + response = self.controller.handle_dav_request() + + self.assertEqual(response.status_code, 201) + self.assertEqual(captured["PATH_INFO"], "/") + self.assertEqual(captured["CONTENT_LENGTH"], str(len(body))) + self.assertEqual(captured["wsgi.input"].read(), body) + self.assertEqual(response.headers.get("X-Test"), "yes") diff --git a/base_dav/tests/test_dav_collection_files.py b/base_dav/tests/test_dav_collection_files.py new file mode 100644 index 000000000..3199e9b38 --- /dev/null +++ b/base_dav/tests/test_dav_collection_files.py @@ -0,0 +1,124 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from odoo.tests import tagged + +from .common import BaseDavTestCase + + +@tagged("post_install", "-at_install") +class TestDavCollectionFiles(BaseDavTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + + fixture = cls.create_files_fixture( + partner=cls.create_partner( + name="DAV Files Partner", + email="files@example.com", + ), + collection_name="Partner Files", + attachment_name="hello world.txt", + attachment_raw=b"Hello DAV files", + mimetype="text/plain", + ) + cls.partner = fixture.partner + cls.files_collection = fixture.collection + cls.attachment = fixture.attachment + + def setUp(self): + super().setUp() + self.push_request_context() + + def _folder_href(self): + """Return encoded folder href for the partner record.""" + return self.dav_quote(self.partner.display_name) + + def _file_href(self): + """Return encoded file href for the linked attachment.""" + return f"{self._folder_href()}/{self.dav_quote(self.attachment.name)}" + + def test_dav_list_returns_folders_for_files_collection(self): + """Verify files collection root lists record folders.""" + collection = self.make_collection(self.files_collection) + + hrefs = list(collection.list()) + + self.assertEqual(len(hrefs), 1) + self.assertEqual(hrefs[0], self._folder_href()) + + def test_dav_list_returns_attachments_in_folder(self): + """Verify files collection lists attachments inside record folder.""" + collection = self.make_collection(self.files_collection) + + hrefs = self.files_collection.dav_list( + collection=collection, + path_components=[ + self.env.user.login, + str(self.files_collection.id), + self._folder_href(), + ], + ) + + self.assertEqual(len(hrefs), 1) + self.assertIn(self.dav_quote(self.attachment.name), hrefs[0]) + + def test_dav_get_returns_folder_wrapper_for_folder_href(self): + collection = self.make_collection(self.files_collection) + + result = collection.get(self._folder_href()) + + self.assertTrue(result) + self.assertEqual( + result.path, + f"{self.env.user.login}/{self.files_collection.id}/{self._folder_href()}", + ) + + def test_dav_get_returns_file_item_for_existing_attachment(self): + collection = self.make_collection(self.files_collection) + + result = collection.get(self._file_href()) + + self.assertTrue(result) + self.assertEqual(result.href, self._file_href()) + self.assertTrue(result.last_modified.endswith("GMT")) + self.assertEqual(result.__class__.__name__, "FileItem") + + def test_dav_get_returns_none_for_missing_attachment(self): + collection = self.make_collection(self.files_collection) + missing_href = f"{self._folder_href()}/missing.txt" + + self.assertIsNone(collection.get(missing_href)) + + def test_dav_delete_is_noop_for_files_collection(self): + collection = self.make_collection(self.files_collection) + + collection.delete(self._file_href()) + + self.assertTrue(self.attachment.exists()) + + def test_dav_upload_returns_none_for_files_collection(self): + collection = self.make_collection(self.files_collection) + + uploaded, previous = collection.upload(self._file_href(), object()) + + self.assertIsNone(uploaded) + self.assertTrue(previous) + + def test_dav_list_returns_empty_for_non_files_nested_path(self): + calendar_collection = self.create_collection( + name="Nested Calendar Guard", + dav_type="calendar", + model_xmlid="base.model_res_partner", + domain=f"[('id', '=', {self.partner.id})]", + ) + + result = calendar_collection.dav_list( + collection=self.make_collection(calendar_collection), + path_components=[ + self.env.user.login, + str(calendar_collection.id), + "nested", + ], + ) + + self.assertEqual(result, []) diff --git a/base_dav/tests/test_field_mapping.py b/base_dav/tests/test_field_mapping.py new file mode 100644 index 000000000..753199bea --- /dev/null +++ b/base_dav/tests/test_field_mapping.py @@ -0,0 +1,171 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +import base64 +import datetime +from types import SimpleNamespace + +import vobject +from dateutil import tz + +from odoo.tests import tagged + +from .common import BaseDavTestCase + + +@tagged("post_install", "-at_install") +class TestDavCollectionFieldMapping(BaseDavTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.partner = cls.create_partner( + name="John Doe", + email="john@example.com", + ) + fixture = cls.create_partner_addressbook_fixture( + partner=cls.partner, + name="Address Book", + ) + cls.collection = fixture.collection + cls.map_name = fixture.mappings["name"] + cls.map_email = fixture.mappings["email"] + + cls.map_code = cls.add_mapping( + cls.collection, + name="X-CUSTOM", + field_xmlid="base.field_res_partner__name", + import_code="result = (item.value or '').upper()", + export_code="result = (record.name or '').lower()", + ) + cls.map_binary = cls.add_mapping( + cls.collection, + name="PHOTO", + field_xmlid="base.field_res_partner__image_1920", + ) + cls.map_n = cls.add_mapping( + cls.collection, + name="N", + field_xmlid="base.field_res_partner__name", + ) + + def test_from_vobject_simple_and_code(self): + card = vobject.vCard() + card.add("fn").value = "Jane" + card.add("email").value = "jane@example.com" + card.add("x-custom").value = "abc" + + self.assertEqual(self.map_name.from_vobject(card.fn), "Jane") + self.assertEqual(self.map_email.from_vobject(card.email), "jane@example.com") + self.assertEqual( + self.map_code.from_vobject(card.contents["x-custom"][0]), "ABC" + ) + + def test_to_vobject_simple_and_code(self): + self.assertEqual(self.map_name.to_vobject(self.partner), "John Doe") + self.assertEqual(self.map_email.to_vobject(self.partner), "john@example.com") + self.assertEqual(self.map_code.to_vobject(self.partner), "john doe") + + def test_to_vobject_false_returns_none(self): + partner = self.create_partner(name="No Email", email=False) + self.assertIsNone(self.map_email.to_vobject(partner)) + + def test_datetime_helpers(self): + aware_dt = datetime.datetime( + 2024, 1, 2, 12, 30, 0, tzinfo=tz.gettz("Europe/Kyiv") + ) + item_dt = SimpleNamespace(value=aware_dt) + item_date = SimpleNamespace(value=datetime.date(2024, 1, 2)) + item_other = SimpleNamespace(value="bad") + + dt_as_odoo = self.map_name._from_vobject_datetime(item_dt) + self.assertEqual(dt_as_odoo, "2024-01-02 10:30:00") + self.assertEqual( + self.map_name._from_vobject_datetime(item_date), + "2024-01-02 00:00:00", + ) + self.assertIsNone(self.map_name._from_vobject_datetime(item_other)) + + self.assertEqual(self.map_name._from_vobject_date(item_dt), "2024-01-02") + self.assertEqual(self.map_name._from_vobject_date(item_date), "2024-01-02") + self.assertIsNone(self.map_name._from_vobject_date(item_other)) + + def test_to_vobject_datetime_helpers(self): + naive_dt = datetime.datetime(2024, 1, 2, 12, 30, 0) + aware = self.map_name._to_vobject_datetime("2024-01-02 12:30:00") + self.assertEqual(aware.tzinfo, tz.UTC) + dt_value = datetime.datetime(2024, 1, 2, 12, 30, 0) + self.assertEqual( + self.map_name._to_vobject_datetime_rev(dt_value), + "20240102T123000Z", + ) + self.assertEqual( + self.map_name._to_vobject_date("2024-01-02"), + datetime.date(2024, 1, 2), + ) + + result = self.map_name.to_vobject(self.env["res.partner"].new({"name": "Tmp"})) + self.assertEqual(result, "Tmp") + + self.assertEqual( + self.map_name._to_vobject_datetime(naive_dt).tzinfo, + tz.UTC, + ) + + def test_binary_helpers(self): + raw_jpeg = b"\xff\xd8\xffabc" + encoded = self.map_binary._from_vobject_binary(SimpleNamespace(value=raw_jpeg)) + self.assertEqual(base64.b64decode(encoded), raw_jpeg) + + valid_b64 = base64.b64encode(b"hello-world").decode("ascii") + reencoded = self.map_binary._from_vobject_binary( + SimpleNamespace(value=valid_b64) + ) + self.assertEqual(base64.b64decode(reencoded), b"hello-world") + + invalid_b64 = self.map_binary._from_vobject_binary(SimpleNamespace(value="%%%")) + self.assertEqual(base64.b64decode(invalid_b64), b"%%%") + + self.assertIsNone( + self.map_binary._from_vobject_binary(SimpleNamespace(value="")) + ) + self.assertEqual( + self.map_binary._to_vobject_binary(base64.b64encode(b"abc")), + base64.b64encode(b"abc").decode("ascii"), + ) + + def test_name_helpers(self): + name_obj = vobject.vcard.Name(family="Doe") + self.assertEqual( + self.map_n._from_vobject_char_n(SimpleNamespace(value=name_obj)), + "Doe", + ) + self.assertEqual( + self.map_n._from_vobject_char_n(SimpleNamespace(value="Doe;John")), + "Doe", + ) + self.assertIsNone(self.map_n._from_vobject_char_n(SimpleNamespace(value=None))) + + converted = self.map_n._to_vobject_char_n("Doe") + self.assertEqual(converted.family, "Doe") + + def test_simple_fallback_and_bool_conversion_paths(self): + """Verify simple mapping fallback branches.""" + note_field = self.env["ir.model.fields"]._get("res.partner", "comment") + mapping = self.env["dav.collection.field_mapping"].create( + { + "collection_id": self.collection.id, + "name": "NOTE", + "mapping_type": "simple", + "field_id": note_field.id, + } + ) + + child = SimpleNamespace(value="Some note") + self.assertEqual(mapping.from_vobject(child), "Some note") + + partner = self.create_partner( + name="Bool Test", + email=False, + comment=False, + ) + self.assertIsNone(mapping.to_vobject(partner)) diff --git a/base_dav/tests/test_radicale_collection.py b/base_dav/tests/test_radicale_collection.py new file mode 100644 index 000000000..f49124772 --- /dev/null +++ b/base_dav/tests/test_radicale_collection.py @@ -0,0 +1,201 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from types import SimpleNamespace +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.base_dav.radicale.collection import ( + Collection, + Storage, + _abs_href, + _norm_path, + _rel_href, +) + +from .common import BaseDavTestCase + + +@tagged("post_install", "-at_install") +class TestDavRadicaleCollection(BaseDavTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + + fixture = cls.create_partner_addressbook_fixture( + partner=cls.create_partner( + name="DAV Partner", + email="dav@example.com", + ), + name="Contacts", + ) + cls.partner = fixture.partner + cls.collection_record = fixture.collection + + def setUp(self): + super().setUp() + self.push_request_context() + + def test_path_helpers(self): + self.assertEqual(_norm_path("/demo//x/"), "demo/x") + self.assertEqual(_abs_href("user/7", "15"), "/user/7/15") + self.assertEqual(_abs_href("user/7", "/user/7/15"), "/user/7/15") + self.assertEqual(_rel_href("user/7", "/user/7/15"), "15") + self.assertEqual(_rel_href("", "/abc"), "abc") + + def test_root_and_principal_listing(self): + root = Collection("") + self.assertIn(self.env.user.login, list(root.list())) + + principal = Collection(self.env.user.login) + children = list(principal.list()) + self.assertIn(f"{self.env.user.login}/{self.collection_record.id}", children) + + def test_collection_list_get_and_get_multi(self): + collection = self.make_collection(self.collection_record) + href = str(self.partner.id) + + listed = list(collection.list()) + self.assertIn(href, listed) + + item = collection.get(href) + self.assertTrue(item) + self.assertEqual(item.href, href) + + multi = list(collection.get_multi([href, href])) + self.assertEqual(len(multi), 2) + self.assertEqual(multi[0][0], href) + self.assertEqual(multi[1][0], href) + self.assertTrue(multi[0][1]) + self.assertTrue(multi[1][1]) + + def test_collection_upload_delete_meta_and_last_modified(self): + collection = self.make_collection(self.collection_record) + href = str(self.partner.id) + + old_item = collection.get(href) + vobj = old_item.vobject_item + vobj.email.value = "updated@example.com" + + uploaded, previous = collection.upload(href, vobj) + self.assertTrue(uploaded) + self.assertTrue(previous) + self.partner.invalidate_recordset(["email"]) + self.assertEqual(self.partner.email, "updated@example.com") + + self.assertEqual( + collection.get_meta(), + { + "tag": "VADDRESSBOOK", + "D:displayname": self.collection_record.display_name, + }, + ) + self.assertEqual(collection.get_meta("tag"), "VADDRESSBOOK") + self.assertEqual( + collection.get_meta("D:displayname"), + self.collection_record.display_name, + ) + self.assertTrue(collection.last_modified) + + collection.delete(href) + self.assertFalse(self.partner.exists()) + + def test_collection_upload_delete_guards(self): + not_collection = Collection("unknown/path") + with self.assertRaises(ValueError): + not_collection.upload("1", SimpleNamespace()) + with self.assertRaises(ValueError): + not_collection.delete("1") + + real_collection = self.make_collection(self.collection_record) + with self.assertRaises(NotImplementedError): + real_collection.delete() + + def test_storage_discover_and_guard_methods(self): + storage = Storage(configuration=mock.Mock()) + collection_path = f"{self.env.user.login}/{self.collection_record.id}" + item_path = f"{collection_path}/{self.partner.id}" + + zero_depth = list(storage.discover(collection_path, depth="0")) + self.assertEqual(len(zero_depth), 1) + self.assertIsInstance(zero_depth[0], Collection) + + deep = list(storage.discover(collection_path, depth="1")) + self.assertTrue(deep) + self.assertEqual(deep[0].path, collection_path) + + item = list(storage.discover(item_path, depth="0")) + self.assertEqual(len(item), 1) + self.assertTrue(item[0]) + + self.assertEqual( + list(storage.discover(f"{self.env.user.login}/999999/1")), + [], + ) + + with self.assertRaises(NotImplementedError): + storage.move(mock.Mock(), mock.Mock(), "x") + with self.assertRaises(NotImplementedError): + storage.create_collection("x") + + with storage.acquire_lock("r", user=self.env.user.login): + pass + + self.assertTrue(Storage(configuration=mock.Mock()).verify()) + + def test_collection_get_all_skips_missing_items(self): + """Verify get_all skips empty items returned by get().""" + collection = self.make_collection(self.collection_record) + + with ( + mock.patch.object( + collection, + "list", + return_value=["1", "2", "3"], + ), + mock.patch.object( + collection, + "get", + side_effect=[object(), None, object()], + ), + ): + items = list(collection.get_all()) + + self.assertEqual(len(items), 2) + + def test_collection_get_meta_without_record(self): + """Verify get_meta for non-collection path.""" + collection = Collection("unknown/path") + + self.assertIsNone(collection.get_meta("tag")) + self.assertEqual(collection.get_meta(), {}) + self.assertEqual(collection.last_modified, "") + + def test_file_item_invalid_base64_is_handled(self): + """Verify FileItem tolerates invalid attachment base64.""" + from ..radicale.collection import FileItem + + attachment = SimpleNamespace(datas="%%%") + collection = self.make_collection(self.collection_record) + + item = FileItem( + collection=collection, + href="bad.txt", + attachment=attachment, + last_modified="", + ) + + self.assertEqual(item.href, "bad.txt") + self.assertEqual(item.last_modified, "") + + def test_storage_discover_root_and_principal_depth_one(self): + """Verify discover returns child collections for root/principal paths.""" + storage = Storage(configuration=mock.Mock()) + + root_items = list(storage.discover("", depth="1")) + self.assertTrue(root_items) + self.assertEqual(root_items[0].path, "") + + principal_items = list(storage.discover(self.env.user.login, depth="1")) + self.assertTrue(principal_items) + self.assertEqual(principal_items[0].path, self.env.user.login) diff --git a/base_dav/tests/test_vcard_false_values.py b/base_dav/tests/test_vcard_false_values.py new file mode 100644 index 000000000..63ae9dc9f --- /dev/null +++ b/base_dav/tests/test_vcard_false_values.py @@ -0,0 +1,44 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +import vobject + +from odoo.tests import tagged + +from .common import BaseDavTestCase + + +@tagged("post_install", "-at_install") +class TestVCardFalseValues(BaseDavTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.partner = cls.create_partner( + name="DAV Partner", + email=False, + ) + fixture = cls.create_partner_addressbook_fixture( + partner=cls.partner, + name="Test Addressbook", + with_name=False, + with_email=False, + ) + cls.collection = fixture.collection + cls.map_email = cls.add_mapping( + cls.collection, + name="EMAIL", + field_xmlid="base.field_res_partner__email", + ) + + def test_simple_mapping_false_must_not_break_vobject(self): + """Verify falsey Char export returns None and does not break vobject.""" + exported = self.map_email.to_vobject(self.partner) + card = vobject.vCard() + if exported is False: + card.add("email").value = exported + with self.assertRaises(AttributeError): + card.serialize() + else: + self.assertIsNone(exported) + card.add("fn").value = "DAV Partner" + card.serialize() # must not raise diff --git a/base_dav/views/dav_collection.xml b/base_dav/views/dav_collection.xml new file mode 100644 index 000000000..a36ba7074 --- /dev/null +++ b/base_dav/views/dav_collection.xml @@ -0,0 +1,82 @@ + + + dav.collection + + + + + + + + + + + dav.collection + +
+ + + + + + + + + + + + + + + + +
+
+
+ + + dav.collection.field_mapping + + + + + + + + + + + dav.collection.field_mapping + +
+ + + + + + + + + +
+
+
+ + + WebDAV collections + dav.collection + list,form + + + +
diff --git a/requirements.txt b/requirements.txt index b9eb3d222..0e3c1c5d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,6 @@ mysqlclient pymssql<=2.2.5 ; python_version <= '3.10' pymssql<=2.2.8 ; python_version < '3.12' pymssql<=2.3.7 ; python_version >= '3.12' +radicale>3.3.0 sqlalchemy vobject