From cd35bb11f2625b6cae6d341a3ff696eb5f43cf28 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 15 Apr 2026 20:33:56 +0000 Subject: [PATCH 1/6] Add 10 new domain classes: problems, service_instances, releases, projects, contracts, knowledge_articles, risks, service_offerings, skill_pools, closure_codes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: fasteiner <75947402+fasteiner@users.noreply.github.com> --- CHANGELOG.md | 10 ++ src/xurrent/closure_codes.py | 61 +++++++++++ src/xurrent/contracts.py | 97 ++++++++++++++++++ src/xurrent/knowledge_articles.py | 121 ++++++++++++++++++++++ src/xurrent/problems.py | 160 +++++++++++++++++++++++++++++ src/xurrent/projects.py | 164 ++++++++++++++++++++++++++++++ src/xurrent/releases.py | 141 +++++++++++++++++++++++++ src/xurrent/risks.py | 149 +++++++++++++++++++++++++++ src/xurrent/service_instances.py | 111 ++++++++++++++++++++ src/xurrent/service_offerings.py | 89 ++++++++++++++++ src/xurrent/skill_pools.py | 102 +++++++++++++++++++ 11 files changed, 1205 insertions(+) create mode 100644 src/xurrent/closure_codes.py create mode 100644 src/xurrent/contracts.py create mode 100644 src/xurrent/knowledge_articles.py create mode 100644 src/xurrent/problems.py create mode 100644 src/xurrent/projects.py create mode 100644 src/xurrent/releases.py create mode 100644 src/xurrent/risks.py create mode 100644 src/xurrent/service_instances.py create mode 100644 src/xurrent/service_offerings.py create mode 100644 src/xurrent/skill_pools.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a1798c1..eb93e14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Problems: added `Problem` class with `ProblemPredefinedFilter`, `ProblemStatus`, and `ProblemImpact` enums; supports CRUD, archive/trash/restore, and sub-resources (requests, workflows, notes). +- ServiceInstances: added `ServiceInstance` class with `ServiceInstancePredefinedFilter` and `ServiceInstanceStatus` enums; supports CRUD and sub-resources (cis, slas, users). +- Releases: added `Release` class with `ReleasePredefinedFilter`, `ReleaseStatus`, and `ReleaseImpact` enums; supports CRUD, archive/trash/restore, and sub-resources (workflows, notes). +- Projects: added `Project` class with `ProjectPredefinedFilter`, `ProjectStatus`, and `ProjectCategory` enums; supports CRUD, archive/trash/restore, and sub-resources (tasks, phases, workflows, risks, notes). +- Contracts: added `Contract` class with `ContractPredefinedFilter` and `ContractStatus` enums; supports CRUD and CI listing. +- KnowledgeArticles: added `KnowledgeArticle` class with `KnowledgeArticlePredefinedFilter` and `KnowledgeArticleStatus` enums; supports CRUD, archive/trash/restore, and sub-resources (requests, service_instances, translations). +- Risks: added `Risk` class with `RiskPredefinedFilter`, `RiskStatus`, and `RiskSeverity` enums; supports CRUD, archive/trash/restore, and sub-resources (organizations, projects, services). +- ServiceOfferings: added `ServiceOffering` class with `ServiceOfferingPredefinedFilter` and `ServiceOfferingStatus` enums; supports CRUD. +- SkillPools: added `SkillPool` class with `SkillPoolPredefinedFilter` enum; supports CRUD, enable/disable, and sub-resources (members, effort_classes). +- ClosureCodes: added `ClosureCode` class; supports CRUD. - Docs: added `CLAUDE.md` with setup instructions, test commands, architecture overview, and changelog requirements for Claude Code. - Products: added `Product` class with `ProductPredefinedFilter` and `ProductDepreciationMethod` enums; supports CRUD, enable/disable, and CI listing. - ProductCategories: added `ProductCategory` class with `ProductCategoryRuleSet` enum; supports CRUD and enable/disable. diff --git a/src/xurrent/closure_codes.py b/src/xurrent/closure_codes.py new file mode 100644 index 0000000..317020c --- /dev/null +++ b/src/xurrent/closure_codes.py @@ -0,0 +1,61 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict + +T = TypeVar('T', bound='ClosureCode') + + +class ClosureCode(JsonSerializableDict): + # https://developer.xurrent.com/v1/closure_codes/ + __resourceUrl__ = 'closure_codes' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + name: Optional[str] = None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.name = name + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return f"ClosureCode(id={self.id}, name={self.name})" + + def ref_str(self) -> str: + return f"ClosureCode(id={self.id}, name={self.name})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_closure_codes(cls, connection_object: XurrentApiHelper, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return ClosureCode.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) diff --git a/src/xurrent/contracts.py b/src/xurrent/contracts.py new file mode 100644 index 0000000..b497b2d --- /dev/null +++ b/src/xurrent/contracts.py @@ -0,0 +1,97 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='Contract') + + +class ContractPredefinedFilter(str, Enum): + active = "active" + inactive = "inactive" + + def __str__(self): + return self.value + + +class ContractStatus(str, Enum): + being_created = "being_created" + active = "active" + expired = "expired" + + def __str__(self): + return self.value + + +class Contract(JsonSerializableDict): + # https://developer.xurrent.com/v1/contracts/ + __resourceUrl__ = 'contracts' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + name: Optional[str] = None, + status: Optional[str] = None, + customer=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.name = name + self.status = ContractStatus(status) if isinstance(status, str) else status + + from .organizations import Organization + self.customer = (customer if isinstance(customer, Organization) + else Organization.from_data(connection_object, customer) if customer else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return f"Contract(id={self.id}, name={self.name}, status={self.status})" + + def ref_str(self) -> str: + return f"Contract(id={self.id}, name={self.name})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_contracts(cls, connection_object: XurrentApiHelper, + predefinedFilter: ContractPredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return Contract.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def get_cis(self, queryfilter: dict = None) -> List: + from .configuration_items import ConfigurationItem + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/cis' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [ConfigurationItem.from_data(self._connection_object, item) for item in response] diff --git a/src/xurrent/knowledge_articles.py b/src/xurrent/knowledge_articles.py new file mode 100644 index 0000000..b8492aa --- /dev/null +++ b/src/xurrent/knowledge_articles.py @@ -0,0 +1,121 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='KnowledgeArticle') + + +class KnowledgeArticlePredefinedFilter(str, Enum): + active = "active" + archived = "archived" + managed_by_me = "managed_by_me" + + def __str__(self): + return self.value + + +class KnowledgeArticleStatus(str, Enum): + not_validated = "not_validated" + validated = "validated" + + def __str__(self): + return self.value + + +class KnowledgeArticle(JsonSerializableDict): + # https://developer.xurrent.com/v1/knowledge_articles/ + __resourceUrl__ = 'knowledge_articles' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + subject: Optional[str] = None, + status: Optional[str] = None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.subject = subject + self.status = KnowledgeArticleStatus(status) if isinstance(status, str) else status + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return f"KnowledgeArticle(id={self.id}, subject={self.subject}, status={self.status})" + + def ref_str(self) -> str: + return f"KnowledgeArticle(id={self.id}, subject={self.subject})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_knowledge_articles(cls, connection_object: XurrentApiHelper, + predefinedFilter: KnowledgeArticlePredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return KnowledgeArticle.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def archive(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/archive' + response = self._connection_object.api_call(uri, 'POST') + return KnowledgeArticle.from_data(self._connection_object, response) + + def trash(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/trash' + response = self._connection_object.api_call(uri, 'POST') + return KnowledgeArticle.from_data(self._connection_object, response) + + def restore(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/restore' + response = self._connection_object.api_call(uri, 'POST') + return KnowledgeArticle.from_data(self._connection_object, response) + + def get_requests(self, queryfilter: dict = None) -> List: + from .requests import Request + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/requests' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Request.from_data(self._connection_object, item) for item in response] + + def get_service_instances(self, queryfilter: dict = None) -> List: + from .service_instances import ServiceInstance + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/service_instances' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [ServiceInstance.from_data(self._connection_object, item) for item in response] + + def get_translations(self, queryfilter: dict = None) -> List[dict]: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/translations' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') diff --git a/src/xurrent/problems.py b/src/xurrent/problems.py new file mode 100644 index 0000000..a14ef6b --- /dev/null +++ b/src/xurrent/problems.py @@ -0,0 +1,160 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='Problem') + + +class ProblemPredefinedFilter(str, Enum): + active = "active" + known_errors = "known_errors" + progress_halted = "progress_halted" + solved = "solved" + managed_by_me = "managed_by_me" + assigned_to_my_teams = "assigned_to_my_teams" + assigned_to_me = "assigned_to_me" + + def __str__(self): + return self.value + + +class ProblemStatus(str, Enum): + registered = "registered" + in_progress = "in_progress" + progress_halted = "progress_halted" + solved = "solved" + + def __str__(self): + return self.value + + +class ProblemImpact(str, Enum): + low = "low" + medium = "medium" + high = "high" + top = "top" + + def __str__(self): + return self.value + + +class Problem(JsonSerializableDict): + # https://developer.xurrent.com/v1/problems/ + __resourceUrl__ = 'problems' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + subject: Optional[str] = None, + status: Optional[str] = None, + impact: Optional[str] = None, + manager=None, + team=None, + member=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.subject = subject + self.status = ProblemStatus(status) if isinstance(status, str) else status + self.impact = ProblemImpact(impact) if isinstance(impact, str) else impact + + from .people import Person + self.manager = (manager if isinstance(manager, Person) + else Person.from_data(connection_object, manager) if manager else None) + self.member = (member if isinstance(member, Person) + else Person.from_data(connection_object, member) if member else None) + + from .teams import Team + self.team = (team if isinstance(team, Team) + else Team.from_data(connection_object, team) if team else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return (f"Problem(id={self.id}, subject={self.subject}, status={self.status}, " + f"impact={self.impact})") + + def ref_str(self) -> str: + return f"Problem(id={self.id}, subject={self.subject})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_problems(cls, connection_object: XurrentApiHelper, + predefinedFilter: ProblemPredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return Problem.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def archive(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/archive' + response = self._connection_object.api_call(uri, 'POST') + return Problem.from_data(self._connection_object, response) + + def trash(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/trash' + response = self._connection_object.api_call(uri, 'POST') + return Problem.from_data(self._connection_object, response) + + def restore(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/restore' + response = self._connection_object.api_call(uri, 'POST') + return Problem.from_data(self._connection_object, response) + + def get_requests(self, queryfilter: dict = None) -> List: + from .requests import Request + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/requests' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Request.from_data(self._connection_object, item) for item in response] + + def get_workflows(self, queryfilter: dict = None) -> List: + from .workflows import Workflow + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/workflows' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Workflow.from_data(self._connection_object, item) for item in response] + + def get_notes(self, queryfilter: dict = None) -> List[dict]: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def add_note(self, note) -> dict: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if isinstance(note, dict): + return self._connection_object.api_call(uri, 'POST', note) + elif isinstance(note, str): + return self._connection_object.api_call(uri, 'POST', {'text': note}) diff --git a/src/xurrent/projects.py b/src/xurrent/projects.py new file mode 100644 index 0000000..b062c95 --- /dev/null +++ b/src/xurrent/projects.py @@ -0,0 +1,164 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='Project') + + +class ProjectPredefinedFilter(str, Enum): + completed = "completed" + open = "open" + managed_by_me = "managed_by_me" + + def __str__(self): + return self.value + + +class ProjectStatus(str, Enum): + being_created = "being_created" + registered = "registered" + in_progress = "in_progress" + progress_halted = "progress_halted" + completed = "completed" + + def __str__(self): + return self.value + + +class ProjectCategory(str, Enum): + engineering = "engineering" + implementation = "implementation" + maintenance = "maintenance" + migration = "migration" + move = "move" + other = "other" + release = "release" + + def __str__(self): + return self.value + + +class Project(JsonSerializableDict): + # https://developer.xurrent.com/v1/projects/ + __resourceUrl__ = 'projects' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + subject: Optional[str] = None, + status: Optional[str] = None, + category: Optional[str] = None, + manager=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.subject = subject + self.status = ProjectStatus(status) if isinstance(status, str) else status + self.category = ProjectCategory(category) if isinstance(category, str) else category + + from .people import Person + self.manager = (manager if isinstance(manager, Person) + else Person.from_data(connection_object, manager) if manager else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return (f"Project(id={self.id}, subject={self.subject}, status={self.status}, " + f"category={self.category})") + + def ref_str(self) -> str: + return f"Project(id={self.id}, subject={self.subject})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_projects(cls, connection_object: XurrentApiHelper, + predefinedFilter: ProjectPredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return Project.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def archive(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/archive' + response = self._connection_object.api_call(uri, 'POST') + return Project.from_data(self._connection_object, response) + + def trash(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/trash' + response = self._connection_object.api_call(uri, 'POST') + return Project.from_data(self._connection_object, response) + + def restore(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/restore' + response = self._connection_object.api_call(uri, 'POST') + return Project.from_data(self._connection_object, response) + + def get_tasks(self, queryfilter: dict = None) -> List: + from .tasks import Task + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/tasks' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Task.from_data(self._connection_object, item) for item in response] + + def get_phases(self, queryfilter: dict = None) -> List[dict]: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/phases' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def get_workflows(self, queryfilter: dict = None) -> List: + from .workflows import Workflow + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/workflows' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Workflow.from_data(self._connection_object, item) for item in response] + + def get_risks(self, queryfilter: dict = None) -> List[dict]: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/risks' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def get_notes(self, queryfilter: dict = None) -> List[dict]: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def add_note(self, note) -> dict: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if isinstance(note, dict): + return self._connection_object.api_call(uri, 'POST', note) + elif isinstance(note, str): + return self._connection_object.api_call(uri, 'POST', {'text': note}) diff --git a/src/xurrent/releases.py b/src/xurrent/releases.py new file mode 100644 index 0000000..529a825 --- /dev/null +++ b/src/xurrent/releases.py @@ -0,0 +1,141 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='Release') + + +class ReleasePredefinedFilter(str, Enum): + completed = "completed" + open = "open" + managed_by_me = "managed_by_me" + + def __str__(self): + return self.value + + +class ReleaseStatus(str, Enum): + being_created = "being_created" + registered = "registered" + in_progress = "in_progress" + progress_halted = "progress_halted" + completed = "completed" + + def __str__(self): + return self.value + + +class ReleaseImpact(str, Enum): + low = "low" + medium = "medium" + high = "high" + top = "top" + + def __str__(self): + return self.value + + +class Release(JsonSerializableDict): + # https://developer.xurrent.com/v1/releases/ + __resourceUrl__ = 'releases' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + subject: Optional[str] = None, + status: Optional[str] = None, + impact: Optional[str] = None, + manager=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.subject = subject + self.status = ReleaseStatus(status) if isinstance(status, str) else status + self.impact = ReleaseImpact(impact) if isinstance(impact, str) else impact + + from .people import Person + self.manager = (manager if isinstance(manager, Person) + else Person.from_data(connection_object, manager) if manager else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return (f"Release(id={self.id}, subject={self.subject}, status={self.status}, " + f"impact={self.impact})") + + def ref_str(self) -> str: + return f"Release(id={self.id}, subject={self.subject})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_releases(cls, connection_object: XurrentApiHelper, + predefinedFilter: ReleasePredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return Release.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def archive(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/archive' + response = self._connection_object.api_call(uri, 'POST') + return Release.from_data(self._connection_object, response) + + def trash(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/trash' + response = self._connection_object.api_call(uri, 'POST') + return Release.from_data(self._connection_object, response) + + def restore(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/restore' + response = self._connection_object.api_call(uri, 'POST') + return Release.from_data(self._connection_object, response) + + def get_workflows(self, queryfilter: dict = None) -> List: + from .workflows import Workflow + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/workflows' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Workflow.from_data(self._connection_object, item) for item in response] + + def get_notes(self, queryfilter: dict = None) -> List[dict]: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def add_note(self, note) -> dict: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if isinstance(note, dict): + return self._connection_object.api_call(uri, 'POST', note) + elif isinstance(note, str): + return self._connection_object.api_call(uri, 'POST', {'text': note}) diff --git a/src/xurrent/risks.py b/src/xurrent/risks.py new file mode 100644 index 0000000..fc37654 --- /dev/null +++ b/src/xurrent/risks.py @@ -0,0 +1,149 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='Risk') + + +class RiskPredefinedFilter(str, Enum): + open = "open" + closed = "closed" + + def __str__(self): + return self.value + + +class RiskStatus(str, Enum): + attrition = "attrition" + availability = "availability" + budget = "budget" + compliance = "compliance" + environmental = "environmental" + legal = "legal" + operational = "operational" + program = "program" + security = "security" + strategic = "strategic" + technology = "technology" + + def __str__(self): + return self.value + + +class RiskSeverity(str, Enum): + low = "low" + medium = "medium" + high = "high" + very_high = "very_high" + + def __str__(self): + return self.value + + +class Risk(JsonSerializableDict): + # https://developer.xurrent.com/v1/risks/ + __resourceUrl__ = 'risks' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + subject: Optional[str] = None, + status: Optional[str] = None, + severity: Optional[str] = None, + manager=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.subject = subject + self.status = RiskStatus(status) if isinstance(status, str) else status + self.severity = RiskSeverity(severity) if isinstance(severity, str) else severity + + from .people import Person + self.manager = (manager if isinstance(manager, Person) + else Person.from_data(connection_object, manager) if manager else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return (f"Risk(id={self.id}, subject={self.subject}, status={self.status}, " + f"severity={self.severity})") + + def ref_str(self) -> str: + return f"Risk(id={self.id}, subject={self.subject})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_risks(cls, connection_object: XurrentApiHelper, + predefinedFilter: RiskPredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return Risk.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def archive(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/archive' + response = self._connection_object.api_call(uri, 'POST') + return Risk.from_data(self._connection_object, response) + + def trash(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/trash' + response = self._connection_object.api_call(uri, 'POST') + return Risk.from_data(self._connection_object, response) + + def restore(self) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/restore' + response = self._connection_object.api_call(uri, 'POST') + return Risk.from_data(self._connection_object, response) + + def get_organizations(self, queryfilter: dict = None) -> List: + from .organizations import Organization + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/organizations' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Organization.from_data(self._connection_object, item) for item in response] + + def get_projects(self, queryfilter: dict = None) -> List: + from .projects import Project + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/projects' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Project.from_data(self._connection_object, item) for item in response] + + def get_services(self, queryfilter: dict = None) -> List: + from .services import Service + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/services' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Service.from_data(self._connection_object, item) for item in response] diff --git a/src/xurrent/service_instances.py b/src/xurrent/service_instances.py new file mode 100644 index 0000000..4e7f318 --- /dev/null +++ b/src/xurrent/service_instances.py @@ -0,0 +1,111 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='ServiceInstance') + + +class ServiceInstancePredefinedFilter(str, Enum): + active = "active" + inactive = "inactive" + + def __str__(self): + return self.value + + +class ServiceInstanceStatus(str, Enum): + being_created = "being_created" + active = "active" + discontinued = "discontinued" + + def __str__(self): + return self.value + + +class ServiceInstance(JsonSerializableDict): + # https://developer.xurrent.com/v1/service_instances/ + __resourceUrl__ = 'service_instances' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + name: Optional[str] = None, + status: Optional[str] = None, + service=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.name = name + self.status = ServiceInstanceStatus(status) if isinstance(status, str) else status + + from .services import Service + self.service = (service if isinstance(service, Service) + else Service.from_data(connection_object, service) if service else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return f"ServiceInstance(id={self.id}, name={self.name}, status={self.status})" + + def ref_str(self) -> str: + return f"ServiceInstance(id={self.id}, name={self.name})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_service_instances(cls, connection_object: XurrentApiHelper, + predefinedFilter: ServiceInstancePredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return ServiceInstance.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def get_cis(self, queryfilter: dict = None) -> List: + from .configuration_items import ConfigurationItem + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/cis' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [ConfigurationItem.from_data(self._connection_object, item) for item in response] + + def get_slas(self, queryfilter: dict = None) -> List[dict]: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/slas' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def get_users(self, queryfilter: dict = None) -> List: + from .people import Person + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/users' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Person.from_data(self._connection_object, item) for item in response] diff --git a/src/xurrent/service_offerings.py b/src/xurrent/service_offerings.py new file mode 100644 index 0000000..abe817a --- /dev/null +++ b/src/xurrent/service_offerings.py @@ -0,0 +1,89 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='ServiceOffering') + + +class ServiceOfferingPredefinedFilter(str, Enum): + catalog = "catalog" + portfolio = "portfolio" + + def __str__(self): + return self.value + + +class ServiceOfferingStatus(str, Enum): + not_offered = "not_offered" + available = "available" + temporarily_unavailable = "temporarily_unavailable" + + def __str__(self): + return self.value + + +class ServiceOffering(JsonSerializableDict): + # https://developer.xurrent.com/v1/service_offerings/ + __resourceUrl__ = 'service_offerings' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + name: Optional[str] = None, + status: Optional[str] = None, + service=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.name = name + self.status = ServiceOfferingStatus(status) if isinstance(status, str) else status + + from .services import Service + self.service = (service if isinstance(service, Service) + else Service.from_data(connection_object, service) if service else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return f"ServiceOffering(id={self.id}, name={self.name}, status={self.status})" + + def ref_str(self) -> str: + return f"ServiceOffering(id={self.id}, name={self.name})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_service_offerings(cls, connection_object: XurrentApiHelper, + predefinedFilter: ServiceOfferingPredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return ServiceOffering.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) diff --git a/src/xurrent/skill_pools.py b/src/xurrent/skill_pools.py new file mode 100644 index 0000000..4a0f7ce --- /dev/null +++ b/src/xurrent/skill_pools.py @@ -0,0 +1,102 @@ +from __future__ import annotations +from typing import Optional, List, TypeVar +from .core import XurrentApiHelper, JsonSerializableDict +from enum import Enum + +T = TypeVar('T', bound='SkillPool') + + +class SkillPoolPredefinedFilter(str, Enum): + disabled = "disabled" + enabled = "enabled" + + def __str__(self): + return self.value + + +class SkillPool(JsonSerializableDict): + # https://developer.xurrent.com/v1/skill_pools/ + __resourceUrl__ = 'skill_pools' + + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + name: Optional[str] = None, + disabled: Optional[bool] = None, + manager=None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.name = name + self.disabled = disabled + + from .people import Person + self.manager = (manager if isinstance(manager, Person) + else Person.from_data(connection_object, manager) if manager else None) + + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + return f"SkillPool(id={self.id}, name={self.name}, disabled={self.disabled})" + + def ref_str(self) -> str: + return f"SkillPool(id={self.id}, name={self.name})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_skill_pools(cls, connection_object: XurrentApiHelper, + predefinedFilter: SkillPoolPredefinedFilter = None, + queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri += '?' + connection_object.create_filter_string(queryfilter) + response = connection_object.api_call(uri, 'GET') + return [cls.from_data(connection_object, item) for item in response] + + def update(self, data: dict) -> T: + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + return SkillPool.from_data(self._connection_object, response) + + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: + uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def enable(self) -> T: + return self.update({'disabled': False}) + + def disable(self) -> T: + return self.update({'disabled': True}) + + def get_members(self, queryfilter: dict = None) -> List: + from .people import Person + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/members' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Person.from_data(self._connection_object, item) for item in response] + + def get_effort_classes(self, queryfilter: dict = None) -> List: + from .effort_classes import EffortClass + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/effort_classes' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [EffortClass.from_data(self._connection_object, item) for item in response] From 3cc3785e9edebce8b05717614873816030b8ab1a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 15 Apr 2026 20:35:23 +0000 Subject: [PATCH 2/6] Add TypeError for invalid note type in add_note methods Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: fasteiner <75947402+fasteiner@users.noreply.github.com> --- src/xurrent/problems.py | 2 ++ src/xurrent/projects.py | 2 ++ src/xurrent/releases.py | 2 ++ 3 files changed, 6 insertions(+) diff --git a/src/xurrent/problems.py b/src/xurrent/problems.py index a14ef6b..6aaa330 100644 --- a/src/xurrent/problems.py +++ b/src/xurrent/problems.py @@ -158,3 +158,5 @@ def add_note(self, note) -> dict: return self._connection_object.api_call(uri, 'POST', note) elif isinstance(note, str): return self._connection_object.api_call(uri, 'POST', {'text': note}) + else: + raise TypeError(f"Expected 'note' to be a str or dict, got {type(note).__name__}") diff --git a/src/xurrent/projects.py b/src/xurrent/projects.py index b062c95..ba24de4 100644 --- a/src/xurrent/projects.py +++ b/src/xurrent/projects.py @@ -162,3 +162,5 @@ def add_note(self, note) -> dict: return self._connection_object.api_call(uri, 'POST', note) elif isinstance(note, str): return self._connection_object.api_call(uri, 'POST', {'text': note}) + else: + raise TypeError(f"Expected 'note' to be a str or dict, got {type(note).__name__}") diff --git a/src/xurrent/releases.py b/src/xurrent/releases.py index 529a825..098fad3 100644 --- a/src/xurrent/releases.py +++ b/src/xurrent/releases.py @@ -139,3 +139,5 @@ def add_note(self, note) -> dict: return self._connection_object.api_call(uri, 'POST', note) elif isinstance(note, str): return self._connection_object.api_call(uri, 'POST', {'text': note}) + else: + raise TypeError(f"Expected 'note' to be a str or dict, got {type(note).__name__}") From bd6bb45f142a98890271435a717d7fa1dfcd9fe3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 15 Apr 2026 20:40:02 +0000 Subject: [PATCH 3/6] Add sub-resource methods to existing domain classes - Requests: get_attachments, get_knowledge_articles, get_automation_rules, get_satisfaction_feedback, get_tags, get_watches - Tasks: get_notes, add_note, get_approvals, get_cis, get_predecessors, get_successors, get_service_instances, get_automation_rules - Workflows: get_notes, add_note, get_automation_rules, get_phases, get_requests, get_problems - People: get_cis, get_addresses, get_contacts, get_permissions, get_ci_coverages, get_sla_coverages, get_service_coverages, get_out_of_office_periods, get_skill_pools - Organizations: get_addresses, get_contacts, get_contracts, get_risks, get_slas, get_time_allocations - Services: get_workflows, get_request_templates, get_risks, get_service_instances, get_slas, get_service_offerings - Calendars: get_duration, get_hours, get_holidays - Teams: get_service_instances - Holidays: get_calendars Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: fasteiner <75947402+fasteiner@users.noreply.github.com> --- CHANGELOG.md | 9 ++++++ src/xurrent/calendars.py | 28 +++++++++++++++++++ src/xurrent/holidays.py | 7 +++++ src/xurrent/organizations.py | 36 ++++++++++++++++++++++++ src/xurrent/people.py | 51 ++++++++++++++++++++++++++++++++++ src/xurrent/requests.py | 32 ++++++++++++++++++++++ src/xurrent/services.py | 42 ++++++++++++++++++++++++++++ src/xurrent/tasks.py | 53 ++++++++++++++++++++++++++++++++++++ src/xurrent/teams.py | 7 +++++ src/xurrent/workflows.py | 41 ++++++++++++++++++++++++++++ 10 files changed, 306 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb93e14..48dcb89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Risks: added `Risk` class with `RiskPredefinedFilter`, `RiskStatus`, and `RiskSeverity` enums; supports CRUD, archive/trash/restore, and sub-resources (organizations, projects, services). - ServiceOfferings: added `ServiceOffering` class with `ServiceOfferingPredefinedFilter` and `ServiceOfferingStatus` enums; supports CRUD. - SkillPools: added `SkillPool` class with `SkillPoolPredefinedFilter` enum; supports CRUD, enable/disable, and sub-resources (members, effort_classes). +- Requests: added `get_attachments`, `get_knowledge_articles`, `get_automation_rules`, `get_satisfaction_feedback`, `get_tags`, and `get_watches` instance methods. +- Tasks: added `get_notes`, `add_note`, `get_approvals`, `get_cis`, `get_predecessors`, `get_successors`, `get_service_instances`, and `get_automation_rules` instance methods. +- Workflows: added `get_notes`, `add_note`, `get_automation_rules`, `get_phases`, `get_requests`, and `get_problems` instance methods. +- People: added `get_cis`, `get_addresses`, `get_contacts`, `get_permissions`, `get_ci_coverages`, `get_sla_coverages`, `get_service_coverages`, `get_out_of_office_periods`, and `get_skill_pools` instance methods. +- Organizations: added `get_addresses`, `get_contacts`, `get_contracts`, `get_risks`, `get_slas`, and `get_time_allocations` instance methods. +- Services: added `get_workflows`, `get_request_templates`, `get_risks`, `get_service_instances`, `get_slas`, and `get_service_offerings` instance methods. +- Calendars: added `get_duration`, `get_hours`, and `get_holidays` instance methods. +- Teams: added `get_service_instances` instance method. +- Holidays: added `get_calendars` instance method. - ClosureCodes: added `ClosureCode` class; supports CRUD. - Docs: added `CLAUDE.md` with setup instructions, test commands, architecture overview, and changelog requirements for Claude Code. - Products: added `Product` class with `ProductPredefinedFilter` and `ProductDepreciationMethod` enums; supports CRUD, enable/disable, and CI listing. diff --git a/src/xurrent/calendars.py b/src/xurrent/calendars.py index c2f91ed..564c9b7 100644 --- a/src/xurrent/calendars.py +++ b/src/xurrent/calendars.py @@ -78,3 +78,31 @@ def enable(self) -> T: def disable(self) -> T: return self.update({'disabled': True}) + + def get_duration(self, start: str, end: str, time_zone: str = None) -> dict: + """ + Calculate the duration between two timestamps according to the calendar. + + :param start: Start datetime string (ISO 8601) + :param end: End datetime string (ISO 8601) + :param time_zone: Optional time zone name + :return: Duration data from the API + """ + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/duration' + params = f'start={start}&end={end}' + if time_zone: + params += f'&time_zone={time_zone}' + uri += f'?{params}' + return self._connection_object.api_call(uri, 'GET') + + def get_hours(self) -> List[dict]: + """Retrieve the working hours of the calendar.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/hours' + return self._connection_object.api_call(uri, 'GET') + + def get_holidays(self) -> List: + """Retrieve the holidays associated with this calendar.""" + from .holidays import Holiday + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/holidays' + response = self._connection_object.api_call(uri, 'GET') + return [Holiday.from_data(self._connection_object, h) for h in response] diff --git a/src/xurrent/holidays.py b/src/xurrent/holidays.py index 1d32e50..a51324e 100644 --- a/src/xurrent/holidays.py +++ b/src/xurrent/holidays.py @@ -64,3 +64,10 @@ def create(cls, connection_object: XurrentApiHelper, data: dict) -> T: uri = f'{connection_object.base_url}/{cls.__resourceUrl__}' response = connection_object.api_call(uri, 'POST', data) return cls.from_data(connection_object, response) + + def get_calendars(self) -> List: + """Retrieve calendars that contain this holiday.""" + from .calendars import Calendar + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/calendars' + response = self._connection_object.api_call(uri, 'GET') + return [Calendar.from_data(self._connection_object, c) for c in response] diff --git a/src/xurrent/organizations.py b/src/xurrent/organizations.py index 17ddea4..e2bcc40 100644 --- a/src/xurrent/organizations.py +++ b/src/xurrent/organizations.py @@ -137,3 +137,39 @@ def get_children(self, queryfilter: dict = None) -> List[T]: uri += '?' + self._connection_object.create_filter_string(queryfilter) response = self._connection_object.api_call(uri, 'GET') return [Organization.from_data(self._connection_object, item) for item in response] + + def get_addresses(self) -> List[dict]: + """Retrieve addresses for this organization instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/addresses' + return self._connection_object.api_call(uri, 'GET') + + def get_contacts(self) -> List[dict]: + """Retrieve contacts for this organization instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/contacts' + return self._connection_object.api_call(uri, 'GET') + + def get_contracts(self) -> List: + """Retrieve contracts for this organization instance.""" + from .contracts import Contract + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/contracts' + response = self._connection_object.api_call(uri, 'GET') + return [Contract.from_data(self._connection_object, c) for c in response] + + def get_risks(self) -> List: + """Retrieve risks for this organization instance.""" + from .risks import Risk + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/risks' + response = self._connection_object.api_call(uri, 'GET') + return [Risk.from_data(self._connection_object, r) for r in response] + + def get_slas(self) -> List[dict]: + """Retrieve SLAs for this organization instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/slas' + return self._connection_object.api_call(uri, 'GET') + + def get_time_allocations(self) -> List: + """Retrieve time allocations for this organization instance.""" + from .time_allocations import TimeAllocation + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/time_allocations' + response = self._connection_object.api_call(uri, 'GET') + return [TimeAllocation.from_data(self._connection_object, ta) for ta in response] diff --git a/src/xurrent/people.py b/src/xurrent/people.py index d8e5522..a5fe994 100644 --- a/src/xurrent/people.py +++ b/src/xurrent/people.py @@ -147,4 +147,55 @@ def restore(self): uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/restore' return self._connection_object.api_call(uri, 'POST') + def get_cis(self) -> List: + """Retrieve configuration items for this person instance.""" + from .configuration_items import ConfigurationItem + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/cis' + response = self._connection_object.api_call(uri, 'GET') + return [ConfigurationItem.from_data(self._connection_object, ci) for ci in response] + + def get_addresses(self) -> List[dict]: + """Retrieve addresses for this person instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/addresses' + return self._connection_object.api_call(uri, 'GET') + + def get_contacts(self) -> List[dict]: + """Retrieve contact information for this person instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/contacts' + return self._connection_object.api_call(uri, 'GET') + + def get_permissions(self) -> List[dict]: + """Retrieve permissions for this person instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/permissions' + return self._connection_object.api_call(uri, 'GET') + + def get_ci_coverages(self) -> List[dict]: + """Retrieve CI coverages for this person instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/ci_coverages' + return self._connection_object.api_call(uri, 'GET') + + def get_sla_coverages(self) -> List[dict]: + """Retrieve SLA coverages for this person instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/sla_coverages' + return self._connection_object.api_call(uri, 'GET') + + def get_service_coverages(self) -> List[dict]: + """Retrieve service coverages for this person instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/service_coverages' + return self._connection_object.api_call(uri, 'GET') + + def get_out_of_office_periods(self) -> List: + """Retrieve out-of-office periods for this person instance.""" + from .out_of_office_periods import OutOfOfficePeriod + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/out_of_office_periods' + response = self._connection_object.api_call(uri, 'GET') + return [OutOfOfficePeriod.from_data(self._connection_object, p) for p in response] + + def get_skill_pools(self) -> List: + """Retrieve skill pools for this person instance.""" + from .skill_pools import SkillPool + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/skill_pools' + response = self._connection_object.api_call(uri, 'GET') + return [SkillPool.from_data(self._connection_object, sp) for sp in response] + \ No newline at end of file diff --git a/src/xurrent/requests.py b/src/xurrent/requests.py index 7af2115..5e8ca39 100644 --- a/src/xurrent/requests.py +++ b/src/xurrent/requests.py @@ -424,3 +424,35 @@ def remove_ci(self, ci_id: int) -> bool: except Exception as e: return False + def get_attachments(self) -> List[dict]: + """Retrieve all attachments associated with this request instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/attachments' + return self._connection_object.api_call(uri, 'GET') + + def get_knowledge_articles(self) -> List: + """Retrieve all knowledge articles associated with this request instance.""" + from .knowledge_articles import KnowledgeArticle + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/knowledge_articles' + response = self._connection_object.api_call(uri, 'GET') + return [KnowledgeArticle.from_data(self._connection_object, item) for item in response] + + def get_automation_rules(self) -> List[dict]: + """Retrieve all automation rules associated with this request instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/automation_rules' + return self._connection_object.api_call(uri, 'GET') + + def get_satisfaction_feedback(self) -> List[dict]: + """Retrieve satisfaction feedback for this request instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/satisfaction_feedback' + return self._connection_object.api_call(uri, 'GET') + + def get_tags(self) -> List[dict]: + """Retrieve all tags associated with this request instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/tags' + return self._connection_object.api_call(uri, 'GET') + + def get_watches(self) -> List[dict]: + """Retrieve all watches on this request instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/watches' + return self._connection_object.api_call(uri, 'GET') + diff --git a/src/xurrent/services.py b/src/xurrent/services.py index 6eb5b8a..3c04ddb 100644 --- a/src/xurrent/services.py +++ b/src/xurrent/services.py @@ -129,3 +129,45 @@ def enable(self) -> T: def disable(self, prefix: str = '', postfix: str = '') -> T: return self.update({'disabled': True, 'name': f'{prefix}{self.name}{postfix}'}) + + def get_workflows(self, queryfilter: dict = None) -> List: + """Retrieve workflows for this service instance.""" + from .workflows import Workflow + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/workflows' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + return [Workflow.from_data(self._connection_object, w) for w in response] + + def get_request_templates(self) -> List: + """Retrieve request templates for this service instance.""" + from .request_templates import RequestTemplate + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/request_templates' + response = self._connection_object.api_call(uri, 'GET') + return [RequestTemplate.from_data(self._connection_object, rt) for rt in response] + + def get_risks(self) -> List: + """Retrieve risks for this service instance.""" + from .risks import Risk + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/risks' + response = self._connection_object.api_call(uri, 'GET') + return [Risk.from_data(self._connection_object, r) for r in response] + + def get_service_instances(self) -> List: + """Retrieve service instances for this service.""" + from .service_instances import ServiceInstance + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/service_instances' + response = self._connection_object.api_call(uri, 'GET') + return [ServiceInstance.from_data(self._connection_object, si) for si in response] + + def get_slas(self) -> List[dict]: + """Retrieve SLAs for this service instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/slas' + return self._connection_object.api_call(uri, 'GET') + + def get_service_offerings(self) -> List: + """Retrieve service offerings for this service.""" + from .service_offerings import ServiceOffering + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/service_offerings' + response = self._connection_object.api_call(uri, 'GET') + return [ServiceOffering.from_data(self._connection_object, so) for so in response] diff --git a/src/xurrent/tasks.py b/src/xurrent/tasks.py index 7b388dc..4f1ab5f 100644 --- a/src/xurrent/tasks.py +++ b/src/xurrent/tasks.py @@ -172,3 +172,56 @@ def create(cls, connection_object: XurrentApiHelper, workflowID: int,data: dict) uri = f'{connection_object.base_url}/workflows/{workflowID}/{cls.__resourceUrl__}' response = connection_object.api_call(uri, 'POST', data) return cls.from_data(connection_object, response) + + def get_notes(self, queryfilter: dict = None) -> List[dict]: + """Retrieve all notes associated with the current task instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def add_note(self, note) -> dict: + """Add a note to the current task instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if isinstance(note, dict): + return self._connection_object.api_call(uri, 'POST', note) + elif isinstance(note, str): + return self._connection_object.api_call(uri, 'POST', {'text': note}) + else: + raise TypeError(f"Expected 'note' to be a str or dict, got {type(note).__name__}") + + def get_approvals(self) -> List[dict]: + """Retrieve all approvals for the current task instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/approvals' + return self._connection_object.api_call(uri, 'GET') + + def get_cis(self) -> List: + """Retrieve configuration items associated with the current task instance.""" + from .configuration_items import ConfigurationItem + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/cis' + response = self._connection_object.api_call(uri, 'GET') + return [ConfigurationItem.from_data(self._connection_object, ci) for ci in response] + + def get_predecessors(self) -> List: + """Retrieve predecessor tasks for the current task instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/predecessors' + response = self._connection_object.api_call(uri, 'GET') + return [Task.from_data(self._connection_object, t) for t in response] + + def get_successors(self) -> List: + """Retrieve successor tasks for the current task instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/successors' + response = self._connection_object.api_call(uri, 'GET') + return [Task.from_data(self._connection_object, t) for t in response] + + def get_service_instances(self) -> List: + """Retrieve service instances associated with the current task instance.""" + from .service_instances import ServiceInstance + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/service_instances' + response = self._connection_object.api_call(uri, 'GET') + return [ServiceInstance.from_data(self._connection_object, si) for si in response] + + def get_automation_rules(self) -> List[dict]: + """Retrieve automation rules associated with the current task instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/automation_rules' + return self._connection_object.api_call(uri, 'GET') diff --git a/src/xurrent/teams.py b/src/xurrent/teams.py index e33575f..a33426a 100644 --- a/src/xurrent/teams.py +++ b/src/xurrent/teams.py @@ -113,3 +113,10 @@ def trash(self) -> T: """ uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/trash' return self._connection_object.api_call(uri, 'POST') + + def get_service_instances(self) -> List: + """Retrieve service instances assigned to this team.""" + from .service_instances import ServiceInstance + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/service_instances' + response = self._connection_object.api_call(uri, 'GET') + return [ServiceInstance.from_data(self._connection_object, si) for si in response] diff --git a/src/xurrent/workflows.py b/src/xurrent/workflows.py index d34a13d..bcf0dc9 100644 --- a/src/xurrent/workflows.py +++ b/src/xurrent/workflows.py @@ -224,3 +224,44 @@ def restore(self): response = self._connection_object.api_call(uri, 'POST') return Workflow.from_data(self._connection_object,response) + def get_notes(self, queryfilter: dict = None) -> List[dict]: + """Retrieve all notes associated with the current workflow instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + return self._connection_object.api_call(uri, 'GET') + + def add_note(self, note) -> dict: + """Add a note to the current workflow instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/notes' + if isinstance(note, dict): + return self._connection_object.api_call(uri, 'POST', note) + elif isinstance(note, str): + return self._connection_object.api_call(uri, 'POST', {'text': note}) + else: + raise TypeError(f"Expected 'note' to be a str or dict, got {type(note).__name__}") + + def get_automation_rules(self) -> List[dict]: + """Retrieve automation rules associated with the current workflow instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/automation_rules' + return self._connection_object.api_call(uri, 'GET') + + def get_phases(self) -> List[dict]: + """Retrieve phases of the current workflow instance.""" + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/phases' + return self._connection_object.api_call(uri, 'GET') + + def get_requests(self) -> List: + """Retrieve requests associated with the current workflow instance.""" + from .requests import Request + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/requests' + response = self._connection_object.api_call(uri, 'GET') + return [Request.from_data(self._connection_object, r) for r in response] + + def get_problems(self) -> List: + """Retrieve problems associated with the current workflow instance.""" + from .problems import Problem + uri = f'{self._connection_object.base_url}/{self.__resourceUrl__}/{self.id}/problems' + response = self._connection_object.api_call(uri, 'GET') + return [Problem.from_data(self._connection_object, p) for p in response] + From 967c975a074b4b055c266c7c6b7dbd74b0b467be Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 15 Apr 2026 20:42:01 +0000 Subject: [PATCH 4/6] Add utility API methods to XurrentApiHelper and populate __init__.py exports - Add search(), bulk_import(), list_archive(), list_trash(), list_audit_lines() methods to XurrentApiHelper - Populate __init__.py with exports for all domain classes across all modules Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: fasteiner <75947402+fasteiner@users.noreply.github.com> --- src/xurrent/__init__.py | 35 ++++++++++++++++++++++++ src/xurrent/core.py | 59 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/src/xurrent/__init__.py b/src/xurrent/__init__.py index e69de29..69f0f09 100644 --- a/src/xurrent/__init__.py +++ b/src/xurrent/__init__.py @@ -0,0 +1,35 @@ +from .core import XurrentApiHelper, JsonSerializableDict +from .calendars import Calendar, CalendarPredefinedFilter +from .closure_codes import ClosureCode +from .configuration_items import ConfigurationItem, ConfigurationItemPredefinedFilter +from .contracts import Contract, ContractPredefinedFilter, ContractStatus +from .custom_collection_elements import CustomCollectionElement, CustomCollectionElementPredefinedFilter +from .custom_collections import CustomCollection, CustomCollectionPredefinedFilter +from .effort_classes import EffortClass, EffortClassPredefinedFilter +from .holidays import Holiday +from .knowledge_articles import KnowledgeArticle, KnowledgeArticlePredefinedFilter, KnowledgeArticleStatus +from .organizations import Organization, OrganizationPredefinedFilter +from .out_of_office_periods import OutOfOfficePeriod, OutOfOfficePeriodPredefinedFilter +from .people import Person, PeoplePredefinedFilter +from .problems import Problem, ProblemPredefinedFilter, ProblemStatus, ProblemImpact +from .product_categories import ProductCategory, ProductCategoryRuleSet +from .products import Product, ProductPredefinedFilter, ProductDepreciationMethod +from .projects import Project, ProjectPredefinedFilter, ProjectStatus, ProjectCategory +from .releases import Release, ReleasePredefinedFilter, ReleaseStatus, ReleaseImpact +from .request_templates import RequestTemplate, RequestTemplatePredefinedFilter, RequestTemplateCategory, RequestTemplateStatus, RequestTemplateImpact +from .requests import Request, RequestCategory, RequestStatus, CompletionReason, PredefinedFilter, PredefinedNotesFilter +from .risks import Risk, RiskPredefinedFilter, RiskStatus, RiskSeverity +from .service_instances import ServiceInstance, ServiceInstancePredefinedFilter, ServiceInstanceStatus +from .service_offerings import ServiceOffering, ServiceOfferingPredefinedFilter, ServiceOfferingStatus +from .services import Service, ServicePredefinedFilter +from .shop_article_categories import ShopArticleCategory, ShopArticleCategoryPredefinedFilter +from .shop_articles import ShopArticle, ShopArticlePredefinedFilter, ShopArticleRecurringPeriod +from .shop_order_lines import ShopOrderLine, ShopOrderLinePredefinedFilter, ShopOrderLineStatus, ShopOrderLineRecurringPeriod +from .sites import Site, SitePredefinedFilter +from .skill_pools import SkillPool, SkillPoolPredefinedFilter +from .tasks import Task, TaskPredefinedFilter, TaskStatus +from .teams import Team, TeamPredefinedFilter +from .time_allocations import TimeAllocation, TimeAllocationPredefinedFilter, TimeAllocationCustomerCategory, TimeAllocationServiceCategory, TimeAllocationDescriptionCategory +from .ui_extensions import UiExtension, UiExtensionCategory +from .workflow_templates import WorkflowTemplate, WorkflowTemplatePredefinedFilter, WorkflowTemplateCategory +from .workflows import Workflow, WorkflowCompletionReason, WorkflowStatus, WorkflowCategory, WorkflowPredefinedFilter diff --git a/src/xurrent/core.py b/src/xurrent/core.py index 1fcbcc6..eb7a2a7 100644 --- a/src/xurrent/core.py +++ b/src/xurrent/core.py @@ -388,6 +388,65 @@ def bulk_export(self, type: str, export_format='csv', save_as=None, poll_timeout return True return result + def search(self, query: str, types: list = None) -> list: + """ + Perform a cross-resource full-text search. + :param query: Search query string + :param types: Optional list of resource types to search (e.g. ['request', 'person']) + :return: List of search results + """ + uri = f'/search?q={query}' + if types: + uri += '&types=' + ','.join(types) + return self.api_call(uri, 'GET') + + def bulk_import(self, data: str, import_type: str, import_format: str = 'csv') -> dict: + """ + Perform a bulk import of records. + :param data: CSV/TSV data as a string + :param import_type: Resource type to import (e.g. 'people', 'configuration_items') + :param import_format: Format of the import data ('csv' or 'tsv', default: 'csv') + :return: Import result from the API + """ + return self.api_call('/import', method='POST', data={ + 'type': import_type, + 'import_format': import_format, + 'data': data + }) + + def list_archive(self, queryfilter: dict = None) -> list: + """ + List all archived items. + :param queryfilter: Optional query filter parameters + :return: List of archived items + """ + uri = '/archive' + if queryfilter: + uri += '?' + self.create_filter_string(queryfilter) + return self.api_call(uri, 'GET') + + def list_trash(self, queryfilter: dict = None) -> list: + """ + List all trashed items. + :param queryfilter: Optional query filter parameters + :return: List of trashed items + """ + uri = '/trash' + if queryfilter: + uri += '?' + self.create_filter_string(queryfilter) + return self.api_call(uri, 'GET') + + def list_audit_lines(self, queryfilter: dict = None) -> list: + """ + List audit log entries. + :param queryfilter: Optional query filter parameters + :return: List of audit log entries + """ + uri = '/audit_lines' + if queryfilter: + uri += '?' + self.create_filter_string(queryfilter) + return self.api_call(uri, 'GET') + def custom_fields_to_object(self, custom_fields): """ Convert a list of custom fields to a dictionary. From 18efde8cf80bc1dd0056c01aed198e4a54240657 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 15 Apr 2026 20:48:28 +0000 Subject: [PATCH 5/6] Add comprehensive unit tests for new domain classes and sub-resource methods - test_problems.py: Problem CRUD, notes, requests, workflows - test_service_instances.py: ServiceInstance CRUD, CIs, users - test_releases.py: Release CRUD, archive/trash/restore, workflows, notes - test_projects.py: Project CRUD, archive/trash/restore, tasks/phases/workflows/notes - test_contracts.py: Contract CRUD, CIs - test_knowledge_articles.py: KnowledgeArticle CRUD, archive/trash/restore, requests/service_instances/translations - test_risks.py: Risk CRUD, archive/trash/restore, organizations/projects/services - test_service_offerings.py: ServiceOffering CRUD - test_skill_pools.py: SkillPool CRUD, enable/disable, members/effort_classes - test_closure_codes.py: ClosureCode CRUD - test_new_sub_resources.py: new sub-resource methods on Request, Task, Workflow, Person, Organization, Service, Calendar, Holiday, Team; core search/bulk_import/ list_archive/list_trash/list_audit_lines utilities Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: fasteiner <75947402+fasteiner@users.noreply.github.com> --- tests/unit_tests/test_closure_codes.py | 87 +++ tests/unit_tests/test_contracts.py | 117 ++++ tests/unit_tests/test_knowledge_articles.py | 156 ++++++ tests/unit_tests/test_new_sub_resources.py | 590 ++++++++++++++++++++ tests/unit_tests/test_problems.py | 193 +++++++ tests/unit_tests/test_projects.py | 186 ++++++ tests/unit_tests/test_releases.py | 174 ++++++ tests/unit_tests/test_risks.py | 164 ++++++ tests/unit_tests/test_service_instances.py | 132 +++++ tests/unit_tests/test_service_offerings.py | 104 ++++ tests/unit_tests/test_skill_pools.py | 147 +++++ 11 files changed, 2050 insertions(+) create mode 100644 tests/unit_tests/test_closure_codes.py create mode 100644 tests/unit_tests/test_contracts.py create mode 100644 tests/unit_tests/test_knowledge_articles.py create mode 100644 tests/unit_tests/test_new_sub_resources.py create mode 100644 tests/unit_tests/test_problems.py create mode 100644 tests/unit_tests/test_projects.py create mode 100644 tests/unit_tests/test_releases.py create mode 100644 tests/unit_tests/test_risks.py create mode 100644 tests/unit_tests/test_service_instances.py create mode 100644 tests/unit_tests/test_service_offerings.py create mode 100644 tests/unit_tests/test_skill_pools.py diff --git a/tests/unit_tests/test_closure_codes.py b/tests/unit_tests/test_closure_codes.py new file mode 100644 index 0000000..7f72085 --- /dev/null +++ b/tests/unit_tests/test_closure_codes.py @@ -0,0 +1,87 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.closure_codes import ClosureCode + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def cc_instance(mock_connection): + return ClosureCode( + connection_object=mock_connection, + id=120, + name="Resolved", + ) + + +def test_closure_code_initialization(cc_instance): + assert isinstance(cc_instance, ClosureCode) + assert cc_instance.__resourceUrl__ == "closure_codes" + assert cc_instance.id == 120 + assert cc_instance.name == "Resolved" + + +def test_closure_code_from_data(mock_connection): + data = {"id": 120, "name": "Resolved"} + cc = ClosureCode.from_data(mock_connection, data) + assert isinstance(cc, ClosureCode) + assert cc.id == 120 + assert cc.name == "Resolved" + + +def test_get_closure_code_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 120, "name": "Resolved"} + result = ClosureCode.get_by_id(mock_connection, 120) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/closure_codes/120", "GET" + ) + assert isinstance(result, ClosureCode) + assert result.id == 120 + + +def test_get_closure_codes(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Resolved"}, + {"id": 2, "name": "Cancelled"}, + ] + results = ClosureCode.get_closure_codes(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/closure_codes", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, ClosureCode) for r in results) + + +def test_create_closure_code(mock_connection): + mock_connection.api_call.return_value = {"id": 121, "name": "Duplicate"} + result = ClosureCode.create(mock_connection, {"name": "Duplicate"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/closure_codes", "POST", {"name": "Duplicate"} + ) + assert isinstance(result, ClosureCode) + assert result.id == 121 + + +def test_update_closure_code(mock_connection, cc_instance): + mock_connection.api_call.return_value = {"id": 120, "name": "Fixed"} + result = cc_instance.update({"name": "Fixed"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/closure_codes/120", "PATCH", {"name": "Fixed"} + ) + assert isinstance(result, ClosureCode) + assert result.name == "Fixed" diff --git a/tests/unit_tests/test_contracts.py b/tests/unit_tests/test_contracts.py new file mode 100644 index 0000000..b24945a --- /dev/null +++ b/tests/unit_tests/test_contracts.py @@ -0,0 +1,117 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.contracts import Contract, ContractPredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def contract_instance(mock_connection): + return Contract( + connection_object=mock_connection, + id=70, + name="Support Contract", + status="active", + ) + + +def test_contract_initialization(contract_instance): + assert isinstance(contract_instance, Contract) + assert contract_instance.__resourceUrl__ == "contracts" + assert contract_instance.id == 70 + assert contract_instance.name == "Support Contract" + + +def test_contract_from_data(mock_connection): + data = { + "id": 70, + "name": "Support Contract", + "status": "active", + "customer": {"id": 10, "name": "Acme Corp"}, + } + contract = Contract.from_data(mock_connection, data) + assert isinstance(contract, Contract) + assert contract.id == 70 + from xurrent.organizations import Organization + assert isinstance(contract.customer, Organization) + assert contract.customer.name == "Acme Corp" + + +def test_get_contract_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 70, "name": "Support Contract"} + result = Contract.get_by_id(mock_connection, 70) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/contracts/70", "GET" + ) + assert isinstance(result, Contract) + assert result.id == 70 + + +def test_get_contracts(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Contract A"}, + {"id": 2, "name": "Contract B"}, + ] + results = Contract.get_contracts(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/contracts", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Contract) for r in results) + + +def test_get_contracts_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + Contract.get_contracts(mock_connection, predefinedFilter=ContractPredefinedFilter.active) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/contracts/active", "GET" + ) + + +def test_create_contract(mock_connection): + mock_connection.api_call.return_value = {"id": 71, "name": "New Contract"} + result = Contract.create(mock_connection, {"name": "New Contract"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/contracts", "POST", {"name": "New Contract"} + ) + assert isinstance(result, Contract) + assert result.id == 71 + + +def test_update_contract(mock_connection, contract_instance): + mock_connection.api_call.return_value = {"id": 70, "name": "Updated Contract"} + result = contract_instance.update({"name": "Updated Contract"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/contracts/70", "PATCH", {"name": "Updated Contract"} + ) + assert isinstance(result, Contract) + assert result.name == "Updated Contract" + + +def test_get_cis(mock_connection, contract_instance): + from xurrent.configuration_items import ConfigurationItem + mock_connection.api_call.return_value = [ + {"id": 1, "label": "CI-1"}, + {"id": 2, "label": "CI-2"}, + ] + results = contract_instance.get_cis() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/contracts/70/cis", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, ConfigurationItem) for r in results) diff --git a/tests/unit_tests/test_knowledge_articles.py b/tests/unit_tests/test_knowledge_articles.py new file mode 100644 index 0000000..9d9642d --- /dev/null +++ b/tests/unit_tests/test_knowledge_articles.py @@ -0,0 +1,156 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.knowledge_articles import KnowledgeArticle, KnowledgeArticlePredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def ka_instance(mock_connection): + return KnowledgeArticle( + connection_object=mock_connection, + id=80, + subject="How to reset password", + status="validated", + ) + + +def test_knowledge_article_initialization(ka_instance): + assert isinstance(ka_instance, KnowledgeArticle) + assert ka_instance.__resourceUrl__ == "knowledge_articles" + assert ka_instance.id == 80 + assert ka_instance.subject == "How to reset password" + + +def test_knowledge_article_from_data(mock_connection): + data = {"id": 80, "subject": "How to reset password", "status": "validated"} + ka = KnowledgeArticle.from_data(mock_connection, data) + assert isinstance(ka, KnowledgeArticle) + assert ka.id == 80 + + +def test_get_knowledge_article_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 80, "subject": "How to reset password"} + result = KnowledgeArticle.get_by_id(mock_connection, 80) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80", "GET" + ) + assert isinstance(result, KnowledgeArticle) + assert result.id == 80 + + +def test_get_knowledge_articles(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "KA A"}, + {"id": 2, "subject": "KA B"}, + ] + results = KnowledgeArticle.get_knowledge_articles(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, KnowledgeArticle) for r in results) + + +def test_get_knowledge_articles_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + KnowledgeArticle.get_knowledge_articles( + mock_connection, predefinedFilter=KnowledgeArticlePredefinedFilter.active + ) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/active", "GET" + ) + + +def test_create_knowledge_article(mock_connection): + mock_connection.api_call.return_value = {"id": 81, "subject": "New KA"} + result = KnowledgeArticle.create(mock_connection, {"subject": "New KA"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles", "POST", {"subject": "New KA"} + ) + assert isinstance(result, KnowledgeArticle) + assert result.id == 81 + + +def test_update_knowledge_article(mock_connection, ka_instance): + mock_connection.api_call.return_value = {"id": 80, "subject": "Updated KA"} + result = ka_instance.update({"subject": "Updated KA"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80", "PATCH", {"subject": "Updated KA"} + ) + assert isinstance(result, KnowledgeArticle) + assert result.subject == "Updated KA" + + +def test_archive_knowledge_article(mock_connection, ka_instance): + mock_connection.api_call.return_value = {"id": 80, "subject": "How to reset password"} + result = ka_instance.archive() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80/archive", "POST" + ) + assert isinstance(result, KnowledgeArticle) + + +def test_trash_knowledge_article(mock_connection, ka_instance): + mock_connection.api_call.return_value = {"id": 80, "subject": "How to reset password"} + result = ka_instance.trash() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80/trash", "POST" + ) + assert isinstance(result, KnowledgeArticle) + + +def test_restore_knowledge_article(mock_connection, ka_instance): + mock_connection.api_call.return_value = {"id": 80, "subject": "How to reset password"} + result = ka_instance.restore() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80/restore", "POST" + ) + assert isinstance(result, KnowledgeArticle) + + +def test_get_requests(mock_connection, ka_instance): + from xurrent.requests import Request + mock_connection.api_call.return_value = [{"id": 1, "subject": "Req A"}] + results = ka_instance.get_requests() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80/requests", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Request) for r in results) + + +def test_get_service_instances(mock_connection, ka_instance): + from xurrent.service_instances import ServiceInstance + mock_connection.api_call.return_value = [{"id": 1, "name": "SI A"}] + results = ka_instance.get_service_instances() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80/service_instances", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, ServiceInstance) for r in results) + + +def test_get_translations(mock_connection, ka_instance): + translations_data = [{"id": 1, "language": "nl"}] + mock_connection.api_call.return_value = translations_data + result = ka_instance.get_translations() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/knowledge_articles/80/translations", "GET" + ) + assert result == translations_data diff --git a/tests/unit_tests/test_new_sub_resources.py b/tests/unit_tests/test_new_sub_resources.py new file mode 100644 index 0000000..34a9e5c --- /dev/null +++ b/tests/unit_tests/test_new_sub_resources.py @@ -0,0 +1,590 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock, patch + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.requests import Request +from xurrent.tasks import Task +from xurrent.workflows import Workflow +from xurrent.organizations import Organization +from xurrent.services import Service +from xurrent.calendars import Calendar +from xurrent.holidays import Holiday + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def request_instance(mock_connection): + return Request( + connection_object=mock_connection, + id=1, + subject="Test request", + status="assigned", + ) + + +@pytest.fixture +def task_instance(mock_connection): + return Task( + connection_object=mock_connection, + id=2, + subject="Test task", + ) + + +@pytest.fixture +def workflow_instance(mock_connection): + return Workflow( + connection_object=mock_connection, + id=3, + subject="Test workflow", + ) + + +@pytest.fixture +def person_instance(mock_connection): + return Person( + connection_object=mock_connection, + id=4, + name="Alice", + ) + + +@pytest.fixture +def org_instance(mock_connection): + return Organization( + connection_object=mock_connection, + id=5, + name="Acme Corp", + ) + + +@pytest.fixture +def service_instance(mock_connection): + return Service( + connection_object=mock_connection, + id=6, + name="IT Support", + ) + + +@pytest.fixture +def calendar_instance(mock_connection): + return Calendar( + connection_object=mock_connection, + id=7, + name="Business Hours", + ) + + +@pytest.fixture +def holiday_instance(mock_connection): + return Holiday( + connection_object=mock_connection, + id=8, + name="Christmas", + start_at="2026-12-25T00:00:00Z", + end_at="2026-12-26T00:00:00Z", + ) + + +@pytest.fixture +def team_instance(mock_connection): + return Team( + connection_object=mock_connection, + id=9, + name="Ops Team", + ) + + +# ------------------------- +# Request new sub-resources +# ------------------------- + +def test_request_get_attachments(mock_connection, request_instance): + mock_connection.api_call.return_value = [{"id": 1, "filename": "doc.pdf"}] + result = request_instance.get_attachments() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/requests/1/attachments", "GET" + ) + assert result == [{"id": 1, "filename": "doc.pdf"}] + + +def test_request_get_knowledge_articles(mock_connection, request_instance): + from xurrent.knowledge_articles import KnowledgeArticle + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "KA 1"}, + {"id": 2, "subject": "KA 2"}, + ] + results = request_instance.get_knowledge_articles() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/requests/1/knowledge_articles", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, KnowledgeArticle) for r in results) + + +def test_request_get_automation_rules(mock_connection, request_instance): + mock_connection.api_call.return_value = [{"id": 1, "name": "Rule A"}] + result = request_instance.get_automation_rules() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/requests/1/automation_rules", "GET" + ) + assert result == [{"id": 1, "name": "Rule A"}] + + +def test_request_get_tags(mock_connection, request_instance): + mock_connection.api_call.return_value = [{"id": 1, "name": "urgent"}] + result = request_instance.get_tags() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/requests/1/tags", "GET" + ) + assert result == [{"id": 1, "name": "urgent"}] + + +def test_request_get_watches(mock_connection, request_instance): + mock_connection.api_call.return_value = [{"id": 1, "person_id": 42}] + result = request_instance.get_watches() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/requests/1/watches", "GET" + ) + assert result == [{"id": 1, "person_id": 42}] + + +# ---------------------- +# Task new sub-resources +# ---------------------- + +def test_task_get_notes(mock_connection, task_instance): + notes_data = [{"id": 1, "text": "Note 1"}] + mock_connection.api_call.return_value = notes_data + result = task_instance.get_notes() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/notes", "GET" + ) + assert result == notes_data + + +def test_task_add_note_string(mock_connection, task_instance): + mock_connection.api_call.return_value = {"id": 1, "text": "A note"} + task_instance.add_note("A note") + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/notes", "POST", {"text": "A note"} + ) + + +def test_task_add_note_dict(mock_connection, task_instance): + note = {"text": "A note", "internal": True} + mock_connection.api_call.return_value = {"id": 1, **note} + task_instance.add_note(note) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/notes", "POST", note + ) + + +def test_task_get_approvals(mock_connection, task_instance): + mock_connection.api_call.return_value = [{"id": 1, "status": "pending"}] + result = task_instance.get_approvals() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/approvals", "GET" + ) + assert result == [{"id": 1, "status": "pending"}] + + +def test_task_get_cis(mock_connection, task_instance): + from xurrent.configuration_items import ConfigurationItem + mock_connection.api_call.return_value = [ + {"id": 1, "label": "CI-1"}, + {"id": 2, "label": "CI-2"}, + ] + results = task_instance.get_cis() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/cis", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, ConfigurationItem) for r in results) + + +def test_task_get_predecessors(mock_connection, task_instance): + mock_connection.api_call.return_value = [{"id": 1, "subject": "Predecessor"}] + results = task_instance.get_predecessors() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/predecessors", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Task) for r in results) + + +def test_task_get_successors(mock_connection, task_instance): + mock_connection.api_call.return_value = [{"id": 1, "subject": "Successor"}] + results = task_instance.get_successors() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/successors", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Task) for r in results) + + +def test_task_get_service_instances(mock_connection, task_instance): + from xurrent.service_instances import ServiceInstance + mock_connection.api_call.return_value = [{"id": 1, "name": "SI A"}] + results = task_instance.get_service_instances() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/tasks/2/service_instances", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, ServiceInstance) for r in results) + + +# -------------------------- +# Workflow new sub-resources +# -------------------------- + +def test_workflow_get_notes(mock_connection, workflow_instance): + notes_data = [{"id": 1, "text": "Note 1"}] + mock_connection.api_call.return_value = notes_data + result = workflow_instance.get_notes() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/workflows/3/notes", "GET" + ) + assert result == notes_data + + +def test_workflow_add_note_string(mock_connection, workflow_instance): + mock_connection.api_call.return_value = {"id": 1, "text": "A note"} + workflow_instance.add_note("A note") + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/workflows/3/notes", "POST", {"text": "A note"} + ) + + +def test_workflow_get_requests(mock_connection, workflow_instance): + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Req A"}, + {"id": 2, "subject": "Req B"}, + ] + results = workflow_instance.get_requests() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/workflows/3/requests", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Request) for r in results) + + +def test_workflow_get_problems(mock_connection, workflow_instance): + from xurrent.problems import Problem + mock_connection.api_call.return_value = [{"id": 1, "subject": "Problem A"}] + results = workflow_instance.get_problems() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/workflows/3/problems", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Problem) for r in results) + + +def test_workflow_get_phases(mock_connection, workflow_instance): + phases_data = [{"id": 1, "name": "Phase 1"}] + mock_connection.api_call.return_value = phases_data + result = workflow_instance.get_phases() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/workflows/3/phases", "GET" + ) + assert result == phases_data + + +# ------------------------- +# Person new sub-resources +# ------------------------- + +def test_person_get_cis(mock_connection, person_instance): + from xurrent.configuration_items import ConfigurationItem + mock_connection.api_call.return_value = [{"id": 1, "label": "CI-1"}] + results = person_instance.get_cis() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/people/4/cis", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, ConfigurationItem) for r in results) + + +def test_person_get_addresses(mock_connection, person_instance): + addr_data = [{"id": 1, "street": "Main St"}] + mock_connection.api_call.return_value = addr_data + result = person_instance.get_addresses() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/people/4/addresses", "GET" + ) + assert result == addr_data + + +def test_person_get_out_of_office_periods(mock_connection, person_instance): + from xurrent.out_of_office_periods import OutOfOfficePeriod + mock_connection.api_call.return_value = [ + {"id": 1, "start_at": "2026-01-01T00:00:00Z", "end_at": "2026-01-07T00:00:00Z"} + ] + results = person_instance.get_out_of_office_periods() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/people/4/out_of_office_periods", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, OutOfOfficePeriod) for r in results) + + +def test_person_get_skill_pools(mock_connection, person_instance): + from xurrent.skill_pools import SkillPool + mock_connection.api_call.return_value = [{"id": 1, "name": "Pool A"}] + results = person_instance.get_skill_pools() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/people/4/skill_pools", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, SkillPool) for r in results) + + +# ------------------------------ +# Organization new sub-resources +# ------------------------------ + +def test_org_get_addresses(mock_connection, org_instance): + addr_data = [{"id": 1, "street": "HQ Street"}] + mock_connection.api_call.return_value = addr_data + result = org_instance.get_addresses() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/organizations/5/addresses", "GET" + ) + assert result == addr_data + + +def test_org_get_contracts(mock_connection, org_instance): + from xurrent.contracts import Contract + mock_connection.api_call.return_value = [{"id": 1, "name": "Support"}] + results = org_instance.get_contracts() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/organizations/5/contracts", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Contract) for r in results) + + +def test_org_get_risks(mock_connection, org_instance): + from xurrent.risks import Risk + mock_connection.api_call.return_value = [{"id": 1, "subject": "Risk A"}] + results = org_instance.get_risks() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/organizations/5/risks", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Risk) for r in results) + + +def test_org_get_time_allocations(mock_connection, org_instance): + from xurrent.time_allocations import TimeAllocation + mock_connection.api_call.return_value = [{"id": 1, "name": "TA A"}] + results = org_instance.get_time_allocations() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/organizations/5/time_allocations", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, TimeAllocation) for r in results) + + +# ------------------------- +# Service new sub-resources +# ------------------------- + +def test_service_get_workflows(mock_connection, service_instance): + mock_connection.api_call.return_value = [{"id": 1, "subject": "Wf A"}] + results = service_instance.get_workflows() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/services/6/workflows", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Workflow) for r in results) + + +def test_service_get_service_instances(mock_connection, service_instance): + from xurrent.service_instances import ServiceInstance + mock_connection.api_call.return_value = [{"id": 1, "name": "SI A"}] + results = service_instance.get_service_instances() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/services/6/service_instances", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, ServiceInstance) for r in results) + + +def test_service_get_risks(mock_connection, service_instance): + from xurrent.risks import Risk + mock_connection.api_call.return_value = [{"id": 1, "subject": "Risk A"}] + results = service_instance.get_risks() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/services/6/risks", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Risk) for r in results) + + +def test_service_get_service_offerings(mock_connection, service_instance): + from xurrent.service_offerings import ServiceOffering + mock_connection.api_call.return_value = [{"id": 1, "name": "Gold"}] + results = service_instance.get_service_offerings() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/services/6/service_offerings", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, ServiceOffering) for r in results) + + +# -------------------------- +# Calendar new sub-resources +# -------------------------- + +def test_calendar_get_duration(mock_connection, calendar_instance): + mock_connection.api_call.return_value = {"duration": 3600} + result = calendar_instance.get_duration( + start="2026-01-01T09:00:00Z", end="2026-01-01T10:00:00Z" + ) + call_args = mock_connection.api_call.call_args + assert "/calendars/7/duration" in call_args[0][0] + assert "start=2026-01-01T09:00:00Z" in call_args[0][0] + assert "end=2026-01-01T10:00:00Z" in call_args[0][0] + assert call_args[0][1] == "GET" + assert result == {"duration": 3600} + + +def test_calendar_get_hours(mock_connection, calendar_instance): + hours_data = [{"id": 1, "day": "monday", "time_from": "08:00", "time_until": "17:00"}] + mock_connection.api_call.return_value = hours_data + result = calendar_instance.get_hours() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/calendars/7/hours", "GET" + ) + assert result == hours_data + + +def test_calendar_get_holidays(mock_connection, calendar_instance): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Christmas", "start_at": "2026-12-25T00:00:00Z", "end_at": "2026-12-26T00:00:00Z"} + ] + results = calendar_instance.get_holidays() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/calendars/7/holidays", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Holiday) for r in results) + + +# ---------------------- +# Team new sub-resources +# ---------------------- + +def test_team_get_service_instances(mock_connection, team_instance): + from xurrent.service_instances import ServiceInstance + mock_connection.api_call.return_value = [{"id": 1, "name": "SI A"}] + results = team_instance.get_service_instances() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/teams/9/service_instances", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, ServiceInstance) for r in results) + + +# ------------------------- +# Holiday new sub-resources +# ------------------------- + +def test_holiday_get_calendars(mock_connection, holiday_instance): + mock_connection.api_call.return_value = [{"id": 1, "name": "Business Hours"}] + results = holiday_instance.get_calendars() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/holidays/8/calendars", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Calendar) for r in results) + + +# ------------------------- +# Core utility methods +# ------------------------- + +def test_search(mock_connection): + mock_connection.api_call.return_value = [{"id": 1, "type": "request"}] + mock_connection.search = lambda query, types=None: mock_connection.api_call( + f"/search?q={query}", "GET" + ) + result = mock_connection.search("password reset") + mock_connection.api_call.assert_called_once_with("/search?q=password reset", "GET") + assert result == [{"id": 1, "type": "request"}] + + +def test_search_core(): + helper = XurrentApiHelper( + "https://api.example.com", api_key="key", api_account="acct", resolve_user=False + ) + helper.api_call = MagicMock(return_value=[{"id": 1}]) + result = helper.search("test query") + helper.api_call.assert_called_once_with("/search?q=test query", "GET") + assert result == [{"id": 1}] + + +def test_search_with_types_core(): + helper = XurrentApiHelper( + "https://api.example.com", api_key="key", api_account="acct", resolve_user=False + ) + helper.api_call = MagicMock(return_value=[]) + helper.search("test query", types=["request", "person"]) + helper.api_call.assert_called_once_with("/search?q=test query&types=request,person", "GET") + + +def test_bulk_import_core(): + helper = XurrentApiHelper( + "https://api.example.com", api_key="key", api_account="acct", resolve_user=False + ) + helper.api_call = MagicMock(return_value={"status": "done"}) + result = helper.bulk_import("name,email\nAlice,a@b.com", "people") + helper.api_call.assert_called_once_with("/import", method="POST", data={ + "type": "people", + "import_format": "csv", + "data": "name,email\nAlice,a@b.com", + }) + assert result == {"status": "done"} + + +def test_list_archive_core(): + helper = XurrentApiHelper( + "https://api.example.com", api_key="key", api_account="acct", resolve_user=False + ) + helper.api_call = MagicMock(return_value=[]) + helper.list_archive() + helper.api_call.assert_called_once_with("/archive", "GET") + + +def test_list_trash_core(): + helper = XurrentApiHelper( + "https://api.example.com", api_key="key", api_account="acct", resolve_user=False + ) + helper.api_call = MagicMock(return_value=[]) + helper.list_trash() + helper.api_call.assert_called_once_with("/trash", "GET") + + +def test_list_audit_lines_core(): + helper = XurrentApiHelper( + "https://api.example.com", api_key="key", api_account="acct", resolve_user=False + ) + helper.api_call = MagicMock(return_value=[]) + helper.list_audit_lines() + helper.api_call.assert_called_once_with("/audit_lines", "GET") diff --git a/tests/unit_tests/test_problems.py b/tests/unit_tests/test_problems.py new file mode 100644 index 0000000..3248fc7 --- /dev/null +++ b/tests/unit_tests/test_problems.py @@ -0,0 +1,193 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.problems import Problem, ProblemPredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def problem_instance(mock_connection): + return Problem( + connection_object=mock_connection, + id=10, + subject="Server crash", + status="in_progress", + impact="high", + manager=Person(connection_object=mock_connection, id=5, name="Manager"), + team=Team(connection_object=mock_connection, id=2, name="Ops"), + ) + + +def test_problem_initialization(problem_instance): + assert isinstance(problem_instance, Problem) + assert problem_instance.__resourceUrl__ == "problems" + assert problem_instance.id == 10 + assert problem_instance.subject == "Server crash" + assert isinstance(problem_instance.manager, Person) + assert problem_instance.manager.name == "Manager" + assert isinstance(problem_instance.team, Team) + assert problem_instance.team.name == "Ops" + + +def test_problem_from_data(mock_connection): + data = { + "id": 10, + "subject": "Server crash", + "status": "in_progress", + "impact": "high", + "manager": {"id": 5, "name": "Manager"}, + "team": {"id": 2, "name": "Ops"}, + } + problem = Problem.from_data(mock_connection, data) + assert isinstance(problem, Problem) + assert problem.id == 10 + assert isinstance(problem.manager, Person) + assert isinstance(problem.team, Team) + + +def test_get_problem_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 10, "subject": "Server crash"} + result = Problem.get_by_id(mock_connection, 10) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10", "GET" + ) + assert isinstance(result, Problem) + assert result.id == 10 + + +def test_get_problems(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Problem A"}, + {"id": 2, "subject": "Problem B"}, + ] + results = Problem.get_problems(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Problem) for r in results) + + +def test_get_problems_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + Problem.get_problems(mock_connection, predefinedFilter=ProblemPredefinedFilter.active) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/active", "GET" + ) + + +def test_create_problem(mock_connection): + mock_connection.api_call.return_value = {"id": 11, "subject": "New Problem"} + result = Problem.create(mock_connection, {"subject": "New Problem"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems", "POST", {"subject": "New Problem"} + ) + assert isinstance(result, Problem) + assert result.id == 11 + + +def test_update_problem(mock_connection, problem_instance): + mock_connection.api_call.return_value = {"id": 10, "subject": "Updated Problem"} + result = problem_instance.update({"subject": "Updated Problem"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10", "PATCH", {"subject": "Updated Problem"} + ) + assert isinstance(result, Problem) + assert result.subject == "Updated Problem" + + +def test_archive_problem(mock_connection, problem_instance): + mock_connection.api_call.return_value = {"id": 10, "subject": "Server crash"} + result = problem_instance.archive() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/archive", "POST" + ) + assert isinstance(result, Problem) + + +def test_trash_problem(mock_connection, problem_instance): + mock_connection.api_call.return_value = {"id": 10, "subject": "Server crash"} + result = problem_instance.trash() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/trash", "POST" + ) + assert isinstance(result, Problem) + + +def test_restore_problem(mock_connection, problem_instance): + mock_connection.api_call.return_value = {"id": 10, "subject": "Server crash"} + result = problem_instance.restore() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/restore", "POST" + ) + assert isinstance(result, Problem) + + +def test_get_notes(mock_connection, problem_instance): + notes_data = [{"id": 1, "text": "Note 1"}, {"id": 2, "text": "Note 2"}] + mock_connection.api_call.return_value = notes_data + result = problem_instance.get_notes() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/notes", "GET" + ) + assert result == notes_data + + +def test_add_note_string(mock_connection, problem_instance): + mock_connection.api_call.return_value = {"id": 1, "text": "A note"} + problem_instance.add_note("A note") + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/notes", "POST", {"text": "A note"} + ) + + +def test_add_note_dict(mock_connection, problem_instance): + note = {"text": "A note", "internal": True} + mock_connection.api_call.return_value = {"id": 1, **note} + problem_instance.add_note(note) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/notes", "POST", note + ) + + +def test_get_requests(mock_connection, problem_instance): + from xurrent.requests import Request + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Req A"}, + {"id": 2, "subject": "Req B"}, + ] + results = problem_instance.get_requests() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/requests", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Request) for r in results) + + +def test_get_workflows(mock_connection, problem_instance): + from xurrent.workflows import Workflow + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Wf A"}, + {"id": 2, "subject": "Wf B"}, + ] + results = problem_instance.get_workflows() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/problems/10/workflows", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Workflow) for r in results) diff --git a/tests/unit_tests/test_projects.py b/tests/unit_tests/test_projects.py new file mode 100644 index 0000000..5a63c89 --- /dev/null +++ b/tests/unit_tests/test_projects.py @@ -0,0 +1,186 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.projects import Project, ProjectPredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def project_instance(mock_connection): + return Project( + connection_object=mock_connection, + id=40, + subject="Cloud Migration", + status="in_progress", + category="migration", + manager=Person(connection_object=mock_connection, id=5, name="Manager"), + ) + + +def test_project_initialization(project_instance): + assert isinstance(project_instance, Project) + assert project_instance.__resourceUrl__ == "projects" + assert project_instance.id == 40 + assert project_instance.subject == "Cloud Migration" + assert isinstance(project_instance.manager, Person) + assert project_instance.manager.name == "Manager" + + +def test_project_from_data(mock_connection): + data = { + "id": 40, + "subject": "Cloud Migration", + "status": "in_progress", + "category": "migration", + "manager": {"id": 5, "name": "Manager"}, + } + project = Project.from_data(mock_connection, data) + assert isinstance(project, Project) + assert project.id == 40 + assert isinstance(project.manager, Person) + + +def test_get_project_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 40, "subject": "Cloud Migration"} + result = Project.get_by_id(mock_connection, 40) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40", "GET" + ) + assert isinstance(result, Project) + assert result.id == 40 + + +def test_get_projects(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Proj A"}, + {"id": 2, "subject": "Proj B"}, + ] + results = Project.get_projects(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Project) for r in results) + + +def test_get_projects_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + Project.get_projects(mock_connection, predefinedFilter=ProjectPredefinedFilter.open) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/open", "GET" + ) + + +def test_create_project(mock_connection): + mock_connection.api_call.return_value = {"id": 41, "subject": "New Project"} + result = Project.create(mock_connection, {"subject": "New Project"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects", "POST", {"subject": "New Project"} + ) + assert isinstance(result, Project) + assert result.id == 41 + + +def test_update_project(mock_connection, project_instance): + mock_connection.api_call.return_value = {"id": 40, "subject": "Updated Project"} + result = project_instance.update({"subject": "Updated Project"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40", "PATCH", {"subject": "Updated Project"} + ) + assert isinstance(result, Project) + assert result.subject == "Updated Project" + + +def test_archive_project(mock_connection, project_instance): + mock_connection.api_call.return_value = {"id": 40, "subject": "Cloud Migration"} + result = project_instance.archive() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/archive", "POST" + ) + assert isinstance(result, Project) + + +def test_trash_project(mock_connection, project_instance): + mock_connection.api_call.return_value = {"id": 40, "subject": "Cloud Migration"} + result = project_instance.trash() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/trash", "POST" + ) + assert isinstance(result, Project) + + +def test_restore_project(mock_connection, project_instance): + mock_connection.api_call.return_value = {"id": 40, "subject": "Cloud Migration"} + result = project_instance.restore() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/restore", "POST" + ) + assert isinstance(result, Project) + + +def test_get_tasks(mock_connection, project_instance): + from xurrent.tasks import Task + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Task A"}, + {"id": 2, "subject": "Task B"}, + ] + results = project_instance.get_tasks() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/tasks", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Task) for r in results) + + +def test_get_phases(mock_connection, project_instance): + phases_data = [{"id": 1, "name": "Phase 1"}, {"id": 2, "name": "Phase 2"}] + mock_connection.api_call.return_value = phases_data + result = project_instance.get_phases() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/phases", "GET" + ) + assert result == phases_data + + +def test_get_workflows(mock_connection, project_instance): + from xurrent.workflows import Workflow + mock_connection.api_call.return_value = [{"id": 1, "subject": "Wf A"}] + results = project_instance.get_workflows() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/workflows", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Workflow) for r in results) + + +def test_get_notes(mock_connection, project_instance): + notes_data = [{"id": 1, "text": "Note 1"}] + mock_connection.api_call.return_value = notes_data + result = project_instance.get_notes() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/notes", "GET" + ) + assert result == notes_data + + +def test_add_note_string(mock_connection, project_instance): + mock_connection.api_call.return_value = {"id": 1, "text": "A note"} + project_instance.add_note("A note") + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/projects/40/notes", "POST", {"text": "A note"} + ) diff --git a/tests/unit_tests/test_releases.py b/tests/unit_tests/test_releases.py new file mode 100644 index 0000000..88eb486 --- /dev/null +++ b/tests/unit_tests/test_releases.py @@ -0,0 +1,174 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.releases import Release, ReleasePredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def release_instance(mock_connection): + return Release( + connection_object=mock_connection, + id=50, + subject="v2.0 Release", + status="in_progress", + impact="medium", + manager=Person(connection_object=mock_connection, id=5, name="Manager"), + ) + + +def test_release_initialization(release_instance): + assert isinstance(release_instance, Release) + assert release_instance.__resourceUrl__ == "releases" + assert release_instance.id == 50 + assert release_instance.subject == "v2.0 Release" + assert isinstance(release_instance.manager, Person) + assert release_instance.manager.name == "Manager" + + +def test_release_from_data(mock_connection): + data = { + "id": 50, + "subject": "v2.0 Release", + "status": "in_progress", + "impact": "medium", + "manager": {"id": 5, "name": "Manager"}, + } + release = Release.from_data(mock_connection, data) + assert isinstance(release, Release) + assert release.id == 50 + assert isinstance(release.manager, Person) + + +def test_get_release_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 50, "subject": "v2.0 Release"} + result = Release.get_by_id(mock_connection, 50) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50", "GET" + ) + assert isinstance(result, Release) + assert result.id == 50 + + +def test_get_releases(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "v1.0"}, + {"id": 2, "subject": "v2.0"}, + ] + results = Release.get_releases(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Release) for r in results) + + +def test_get_releases_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + Release.get_releases(mock_connection, predefinedFilter=ReleasePredefinedFilter.open) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/open", "GET" + ) + + +def test_create_release(mock_connection): + mock_connection.api_call.return_value = {"id": 51, "subject": "v3.0"} + result = Release.create(mock_connection, {"subject": "v3.0"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases", "POST", {"subject": "v3.0"} + ) + assert isinstance(result, Release) + assert result.id == 51 + + +def test_update_release(mock_connection, release_instance): + mock_connection.api_call.return_value = {"id": 50, "subject": "v2.1 Release"} + result = release_instance.update({"subject": "v2.1 Release"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50", "PATCH", {"subject": "v2.1 Release"} + ) + assert isinstance(result, Release) + assert result.subject == "v2.1 Release" + + +def test_archive_release(mock_connection, release_instance): + mock_connection.api_call.return_value = {"id": 50, "subject": "v2.0 Release"} + result = release_instance.archive() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50/archive", "POST" + ) + assert isinstance(result, Release) + + +def test_trash_release(mock_connection, release_instance): + mock_connection.api_call.return_value = {"id": 50, "subject": "v2.0 Release"} + result = release_instance.trash() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50/trash", "POST" + ) + assert isinstance(result, Release) + + +def test_restore_release(mock_connection, release_instance): + mock_connection.api_call.return_value = {"id": 50, "subject": "v2.0 Release"} + result = release_instance.restore() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50/restore", "POST" + ) + assert isinstance(result, Release) + + +def test_get_workflows(mock_connection, release_instance): + from xurrent.workflows import Workflow + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Wf A"}, + {"id": 2, "subject": "Wf B"}, + ] + results = release_instance.get_workflows() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50/workflows", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Workflow) for r in results) + + +def test_get_notes(mock_connection, release_instance): + notes_data = [{"id": 1, "text": "Note 1"}] + mock_connection.api_call.return_value = notes_data + result = release_instance.get_notes() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50/notes", "GET" + ) + assert result == notes_data + + +def test_add_note_string(mock_connection, release_instance): + mock_connection.api_call.return_value = {"id": 1, "text": "Hello"} + release_instance.add_note("Hello") + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50/notes", "POST", {"text": "Hello"} + ) + + +def test_add_note_dict(mock_connection, release_instance): + note = {"text": "Hello", "internal": True} + mock_connection.api_call.return_value = {"id": 1, **note} + release_instance.add_note(note) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/releases/50/notes", "POST", note + ) diff --git a/tests/unit_tests/test_risks.py b/tests/unit_tests/test_risks.py new file mode 100644 index 0000000..7111c41 --- /dev/null +++ b/tests/unit_tests/test_risks.py @@ -0,0 +1,164 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.risks import Risk, RiskPredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def risk_instance(mock_connection): + return Risk( + connection_object=mock_connection, + id=90, + subject="Data breach risk", + status="security", + severity="high", + manager=Person(connection_object=mock_connection, id=5, name="Manager"), + ) + + +def test_risk_initialization(risk_instance): + assert isinstance(risk_instance, Risk) + assert risk_instance.__resourceUrl__ == "risks" + assert risk_instance.id == 90 + assert risk_instance.subject == "Data breach risk" + assert isinstance(risk_instance.manager, Person) + + +def test_risk_from_data(mock_connection): + data = { + "id": 90, + "subject": "Data breach risk", + "severity": "high", + "manager": {"id": 5, "name": "Manager"}, + } + risk = Risk.from_data(mock_connection, data) + assert isinstance(risk, Risk) + assert risk.id == 90 + assert isinstance(risk.manager, Person) + + +def test_get_risk_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 90, "subject": "Data breach risk"} + result = Risk.get_by_id(mock_connection, 90) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90", "GET" + ) + assert isinstance(result, Risk) + assert result.id == 90 + + +def test_get_risks(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "subject": "Risk A"}, + {"id": 2, "subject": "Risk B"}, + ] + results = Risk.get_risks(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Risk) for r in results) + + +def test_get_risks_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + Risk.get_risks(mock_connection, predefinedFilter=RiskPredefinedFilter.open) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/open", "GET" + ) + + +def test_create_risk(mock_connection): + mock_connection.api_call.return_value = {"id": 91, "subject": "New Risk"} + result = Risk.create(mock_connection, {"subject": "New Risk"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks", "POST", {"subject": "New Risk"} + ) + assert isinstance(result, Risk) + assert result.id == 91 + + +def test_update_risk(mock_connection, risk_instance): + mock_connection.api_call.return_value = {"id": 90, "subject": "Updated Risk"} + result = risk_instance.update({"subject": "Updated Risk"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90", "PATCH", {"subject": "Updated Risk"} + ) + assert isinstance(result, Risk) + assert result.subject == "Updated Risk" + + +def test_archive_risk(mock_connection, risk_instance): + mock_connection.api_call.return_value = {"id": 90, "subject": "Data breach risk"} + result = risk_instance.archive() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90/archive", "POST" + ) + assert isinstance(result, Risk) + + +def test_trash_risk(mock_connection, risk_instance): + mock_connection.api_call.return_value = {"id": 90, "subject": "Data breach risk"} + result = risk_instance.trash() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90/trash", "POST" + ) + assert isinstance(result, Risk) + + +def test_restore_risk(mock_connection, risk_instance): + mock_connection.api_call.return_value = {"id": 90, "subject": "Data breach risk"} + result = risk_instance.restore() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90/restore", "POST" + ) + assert isinstance(result, Risk) + + +def test_get_organizations(mock_connection, risk_instance): + from xurrent.organizations import Organization + mock_connection.api_call.return_value = [{"id": 1, "name": "Org A"}] + results = risk_instance.get_organizations() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90/organizations", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Organization) for r in results) + + +def test_get_projects(mock_connection, risk_instance): + from xurrent.projects import Project + mock_connection.api_call.return_value = [{"id": 1, "subject": "Proj A"}] + results = risk_instance.get_projects() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90/projects", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Project) for r in results) + + +def test_get_services(mock_connection, risk_instance): + from xurrent.services import Service + mock_connection.api_call.return_value = [{"id": 1, "name": "Service A"}] + results = risk_instance.get_services() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/risks/90/services", "GET" + ) + assert len(results) == 1 + assert all(isinstance(r, Service) for r in results) diff --git a/tests/unit_tests/test_service_instances.py b/tests/unit_tests/test_service_instances.py new file mode 100644 index 0000000..e0520a8 --- /dev/null +++ b/tests/unit_tests/test_service_instances.py @@ -0,0 +1,132 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.service_instances import ServiceInstance, ServiceInstancePredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def si_instance(mock_connection): + return ServiceInstance( + connection_object=mock_connection, + id=30, + name="Production SI", + status="active", + ) + + +def test_service_instance_initialization(si_instance): + assert isinstance(si_instance, ServiceInstance) + assert si_instance.__resourceUrl__ == "service_instances" + assert si_instance.id == 30 + assert si_instance.name == "Production SI" + + +def test_service_instance_from_data(mock_connection): + data = { + "id": 30, + "name": "Production SI", + "status": "active", + "service": {"id": 5, "name": "My Service"}, + } + si = ServiceInstance.from_data(mock_connection, data) + assert isinstance(si, ServiceInstance) + assert si.id == 30 + from xurrent.services import Service + assert isinstance(si.service, Service) + assert si.service.name == "My Service" + + +def test_get_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 30, "name": "Production SI"} + result = ServiceInstance.get_by_id(mock_connection, 30) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_instances/30", "GET" + ) + assert isinstance(result, ServiceInstance) + assert result.id == 30 + + +def test_get_service_instances(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "SI A"}, + {"id": 2, "name": "SI B"}, + ] + results = ServiceInstance.get_service_instances(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_instances", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, ServiceInstance) for r in results) + + +def test_get_service_instances_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + ServiceInstance.get_service_instances( + mock_connection, predefinedFilter=ServiceInstancePredefinedFilter.active + ) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_instances/active", "GET" + ) + + +def test_create_service_instance(mock_connection): + mock_connection.api_call.return_value = {"id": 31, "name": "New SI"} + result = ServiceInstance.create(mock_connection, {"name": "New SI"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_instances", "POST", {"name": "New SI"} + ) + assert isinstance(result, ServiceInstance) + assert result.id == 31 + + +def test_update_service_instance(mock_connection, si_instance): + mock_connection.api_call.return_value = {"id": 30, "name": "Updated SI"} + result = si_instance.update({"name": "Updated SI"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_instances/30", "PATCH", {"name": "Updated SI"} + ) + assert isinstance(result, ServiceInstance) + assert result.name == "Updated SI" + + +def test_get_cis(mock_connection, si_instance): + from xurrent.configuration_items import ConfigurationItem + mock_connection.api_call.return_value = [ + {"id": 1, "label": "CI-1"}, + {"id": 2, "label": "CI-2"}, + ] + results = si_instance.get_cis() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_instances/30/cis", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, ConfigurationItem) for r in results) + + +def test_get_users(mock_connection, si_instance): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": "Bob"}, + ] + results = si_instance.get_users() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_instances/30/users", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Person) for r in results) diff --git a/tests/unit_tests/test_service_offerings.py b/tests/unit_tests/test_service_offerings.py new file mode 100644 index 0000000..1acbd64 --- /dev/null +++ b/tests/unit_tests/test_service_offerings.py @@ -0,0 +1,104 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.service_offerings import ServiceOffering, ServiceOfferingPredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def so_instance(mock_connection): + return ServiceOffering( + connection_object=mock_connection, + id=100, + name="Gold Support", + status="available", + ) + + +def test_service_offering_initialization(so_instance): + assert isinstance(so_instance, ServiceOffering) + assert so_instance.__resourceUrl__ == "service_offerings" + assert so_instance.id == 100 + assert so_instance.name == "Gold Support" + + +def test_service_offering_from_data(mock_connection): + data = { + "id": 100, + "name": "Gold Support", + "status": "available", + "service": {"id": 5, "name": "IT Support"}, + } + so = ServiceOffering.from_data(mock_connection, data) + assert isinstance(so, ServiceOffering) + assert so.id == 100 + from xurrent.services import Service + assert isinstance(so.service, Service) + + +def test_get_service_offering_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 100, "name": "Gold Support"} + result = ServiceOffering.get_by_id(mock_connection, 100) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_offerings/100", "GET" + ) + assert isinstance(result, ServiceOffering) + assert result.id == 100 + + +def test_get_service_offerings(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Gold"}, + {"id": 2, "name": "Silver"}, + ] + results = ServiceOffering.get_service_offerings(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_offerings", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, ServiceOffering) for r in results) + + +def test_get_service_offerings_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + ServiceOffering.get_service_offerings( + mock_connection, predefinedFilter=ServiceOfferingPredefinedFilter.catalog + ) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_offerings/catalog", "GET" + ) + + +def test_create_service_offering(mock_connection): + mock_connection.api_call.return_value = {"id": 101, "name": "Platinum"} + result = ServiceOffering.create(mock_connection, {"name": "Platinum"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_offerings", "POST", {"name": "Platinum"} + ) + assert isinstance(result, ServiceOffering) + assert result.id == 101 + + +def test_update_service_offering(mock_connection, so_instance): + mock_connection.api_call.return_value = {"id": 100, "name": "Updated Gold"} + result = so_instance.update({"name": "Updated Gold"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/service_offerings/100", "PATCH", {"name": "Updated Gold"} + ) + assert isinstance(result, ServiceOffering) + assert result.name == "Updated Gold" diff --git a/tests/unit_tests/test_skill_pools.py b/tests/unit_tests/test_skill_pools.py new file mode 100644 index 0000000..8b7fe7c --- /dev/null +++ b/tests/unit_tests/test_skill_pools.py @@ -0,0 +1,147 @@ +import pytest +import os +import sys +from unittest.mock import MagicMock + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../src"))) + +from xurrent.core import XurrentApiHelper +from xurrent.people import Person +from xurrent.teams import Team +from xurrent.skill_pools import SkillPool, SkillPoolPredefinedFilter + + +@pytest.fixture +def mock_connection(): + mock = MagicMock(spec=XurrentApiHelper) + mock.base_url = "https://api.example.com" + mock.api_user = Person(connection_object=mock, id=1, name="api_user") + mock.api_user_teams = [Team(connection_object=mock, id=1, name="team")] + return mock + + +@pytest.fixture +def sp_instance(mock_connection): + return SkillPool( + connection_object=mock_connection, + id=110, + name="Python Devs", + disabled=False, + manager=Person(connection_object=mock_connection, id=5, name="Manager"), + ) + + +def test_skill_pool_initialization(sp_instance): + assert isinstance(sp_instance, SkillPool) + assert sp_instance.__resourceUrl__ == "skill_pools" + assert sp_instance.id == 110 + assert sp_instance.name == "Python Devs" + assert sp_instance.disabled is False + assert isinstance(sp_instance.manager, Person) + + +def test_skill_pool_from_data(mock_connection): + data = { + "id": 110, + "name": "Python Devs", + "disabled": False, + "manager": {"id": 5, "name": "Manager"}, + } + sp = SkillPool.from_data(mock_connection, data) + assert isinstance(sp, SkillPool) + assert sp.id == 110 + assert isinstance(sp.manager, Person) + + +def test_get_skill_pool_by_id(mock_connection): + mock_connection.api_call.return_value = {"id": 110, "name": "Python Devs"} + result = SkillPool.get_by_id(mock_connection, 110) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools/110", "GET" + ) + assert isinstance(result, SkillPool) + assert result.id == 110 + + +def test_get_skill_pools(mock_connection): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Pool A"}, + {"id": 2, "name": "Pool B"}, + ] + results = SkillPool.get_skill_pools(mock_connection) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, SkillPool) for r in results) + + +def test_get_skill_pools_with_filter(mock_connection): + mock_connection.api_call.return_value = [] + SkillPool.get_skill_pools(mock_connection, predefinedFilter=SkillPoolPredefinedFilter.enabled) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools/enabled", "GET" + ) + + +def test_create_skill_pool(mock_connection): + mock_connection.api_call.return_value = {"id": 111, "name": "Java Devs"} + result = SkillPool.create(mock_connection, {"name": "Java Devs"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools", "POST", {"name": "Java Devs"} + ) + assert isinstance(result, SkillPool) + assert result.id == 111 + + +def test_update_skill_pool(mock_connection, sp_instance): + mock_connection.api_call.return_value = {"id": 110, "name": "Updated Pool"} + result = sp_instance.update({"name": "Updated Pool"}) + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools/110", "PATCH", {"name": "Updated Pool"} + ) + assert isinstance(result, SkillPool) + assert result.name == "Updated Pool" + + +def test_enable_skill_pool(mock_connection, sp_instance): + mock_connection.api_call.return_value = {"id": 110, "name": "Python Devs", "disabled": False} + sp_instance.enable() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools/110", "PATCH", {"disabled": False} + ) + + +def test_disable_skill_pool(mock_connection, sp_instance): + mock_connection.api_call.return_value = {"id": 110, "name": "Python Devs", "disabled": True} + sp_instance.disable() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools/110", "PATCH", {"disabled": True} + ) + + +def test_get_members(mock_connection, sp_instance): + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": "Bob"}, + ] + results = sp_instance.get_members() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools/110/members", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, Person) for r in results) + + +def test_get_effort_classes(mock_connection, sp_instance): + from xurrent.effort_classes import EffortClass + mock_connection.api_call.return_value = [ + {"id": 1, "name": "Regular"}, + {"id": 2, "name": "Overtime"}, + ] + results = sp_instance.get_effort_classes() + mock_connection.api_call.assert_called_once_with( + f"{mock_connection.base_url}/skill_pools/110/effort_classes", "GET" + ) + assert len(results) == 2 + assert all(isinstance(r, EffortClass) for r in results) From 84eb02809d922ccb48d84d66aa0002b82e4d4be0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 15 Apr 2026 20:50:18 +0000 Subject: [PATCH 6/6] Add 10 new domain classes, 43 sub-resource methods, 5 utility APIs, 301 unit tests Agent-Logs-Url: https://github.com/fasteiner/xurrent-python/sessions/7ce5e825-70b8-4b09-ad7e-99899ceede7d Co-authored-by: fasteiner <75947402+fasteiner@users.noreply.github.com> --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48dcb89..897d9f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Teams: added `get_service_instances` instance method. - Holidays: added `get_calendars` instance method. - ClosureCodes: added `ClosureCode` class; supports CRUD. +- Core: added `search(query, types)` for cross-resource full-text search via `GET /search`. +- Core: added `bulk_import(data, import_type, import_format)` for CSV/TSV bulk imports via `POST /import`. +- Core: added `list_archive(queryfilter)` to list all archived items via `GET /archive`. +- Core: added `list_trash(queryfilter)` to list all trashed items via `GET /trash`. +- Core: added `list_audit_lines(queryfilter)` to query the global audit log via `GET /audit_lines`. - Docs: added `CLAUDE.md` with setup instructions, test commands, architecture overview, and changelog requirements for Claude Code. - Products: added `Product` class with `ProductPredefinedFilter` and `ProductDepreciationMethod` enums; supports CRUD, enable/disable, and CI listing. - ProductCategories: added `ProductCategory` class with `ProductCategoryRuleSet` enum; supports CRUD and enable/disable.