Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions datashare-python/datashare_python/cli/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from alive_progress import alive_bar

from datashare_python.cli.utils import AsyncTyper, eprint
from datashare_python.constants import PYTHON_TASK_GROUP
from datashare_python.objects import READY_STATES, Task, TaskError, TaskState
from datashare_python.objects import READY_STATES, Task, TaskError, TaskGroup, TaskState
from datashare_python.task_client import DatashareTaskClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,7 +40,7 @@ async def start(
group: Annotated[
str | None,
typer.Option("--group", "-g", help=_GROUP_HELP),
] = PYTHON_TASK_GROUP.name,
] = TaskGroup.python, # noqa: F821
ds_address: Annotated[
str, typer.Option("--ds-address", "-a", help=_DS_URL_HELP)
] = DEFAULT_DS_ADDRESS,
Expand Down
5 changes: 5 additions & 0 deletions datashare-python/datashare_python/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path
from typing import ClassVar

from icij_common.es import ESClient
Expand Down Expand Up @@ -87,6 +88,10 @@ class WorkerConfig(ICIJSettings, LogWithWorkerIDMixin, BaseModel):

max_concurrent_io_activities: int = 5

docs_root: Path | None = None
artifacts_root: Path | None = None
workdir: Path | None = None

def to_es_client(self) -> ESClient:
return self.elasticsearch.to_es_client(self.datashare.api_key)

Expand Down
1 change: 1 addition & 0 deletions datashare-python/datashare_python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ def doc_2() -> Document:
def doc_3() -> Document:
return Document(
id="doc-3",
index=TEST_PROJECT,
root_document="root-3",
language="SPANISH",
content="traduce este texto al inglés",
Expand Down
5 changes: 2 additions & 3 deletions datashare-python/datashare_python/constants.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from pathlib import Path

from .objects import TaskGroup

PACKAGE_DIR = Path(__file__).parent
PACKAGE_ROOT = PACKAGE_DIR.parent

PYTHON_TASK_GROUP = TaskGroup(name="PYTHON")

DEFAULT_TEMPORAL_ADDRESS = "temporal:7233"

Expand All @@ -14,3 +11,5 @@
DEFAULT_NAMESPACE = "datashare-default"

METADATA_JSON = "metadata.json"

TIKA_METADATA_RESOURCENAME = "tika_metadata_resourcename"
91 changes: 86 additions & 5 deletions datashare-python/datashare_python/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@
from enum import StrEnum, unique
from io import BytesIO
from pathlib import Path
from typing import Any, Literal, Self, TypeVar
from typing import Any, Literal, Self, TypeVar, cast

from temporalio import workflow

from .constants import TIKA_METADATA_RESOURCENAME

with workflow.unsafe.imports_passed_through():
from icij_common.es import DOC_CONTENT, DOC_LANGUAGE, DOC_ROOT_ID, ID_, SOURCE
from icij_common.es import (
DOC_CONTENT,
DOC_CONTENT_TRANSLATED,
DOC_LANGUAGE,
DOC_METADATA,
DOC_PATH,
DOC_ROOT_ID,
ID_,
INDEX_,
SOURCE,
)

from icij_common.pydantic_utils import (
icij_config,
Expand Down Expand Up @@ -137,30 +149,99 @@ class Task(Message):
class TaskGroup:
name: str

@property
@classmethod
def python(cls) -> Self:
return cls(name="PYTHON")


@unique
class DocumentLocation(StrEnum):
ORIGINAL = "original"
ARTIFACTS = "artifacts"
WORKDIR = "workdir"


class FilesystemDocument(DatashareModel):
id: str
path: Path
index: str
location: DocumentLocation
resource_name: str

def locate(
self, original_root: Path, *, artifacts_root: Path, workdir: Path
) -> Path:
from datashare_python.utils import artifacts_dir # noqa: PLC0415

project = self.index
match self.location:
case DocumentLocation.ORIGINAL:
return original_root / self.path
case DocumentLocation.ARTIFACTS:
return artifacts_root / artifacts_dir(self.id, project=project) / "raw"
case DocumentLocation.WORKDIR:
return workdir / self.path
case _:
raise ValueError(f"invalid location: {self.path}")


class Document(DatashareModel):
id: str
root_document: str
language: str
index: str | None = None
root_document: str | None = None
content: str | None = None
content_type: str | None = None
path: Path | None = None
tags: list[str] = Field(default_factory=list)
content_translated: dict[str, str] = Field(
default_factory=dict, alias="content_translated"
)
metadata: dict[str, Any] | None = None
type: str = Field(default="Document", frozen=True)

@classmethod
def from_es(cls, es_doc: dict) -> Self:
sources = es_doc[SOURCE]
return cls(
id=es_doc[ID_],
content=sources[DOC_CONTENT],
content_translated=sources.get("content_translated", dict()),
index=es_doc.get(INDEX_),
content=sources.get(DOC_CONTENT),
content_translated=sources.get(DOC_CONTENT_TRANSLATED, dict()),
language=sources[DOC_LANGUAGE],
root_document=sources[DOC_ROOT_ID],
tags=sources.get("tags", []),
path=sources.get(DOC_PATH),
metadata=sources.get(DOC_METADATA),
)

def to_filesystem(self) -> FilesystemDocument:
from .utils import artifacts_dir # noqa: PLC0415

if self.metadata is None:
raise ValueError(
"can't compute filesyste path for document withtout metadata"
)
resource_name = cast(str, self.metadata[TIKA_METADATA_RESOURCENAME])
if self.root_document is None:
path = self.path
location = DocumentLocation.ORIGINAL
else:
if self.index is None:
msg = (
f"can't compute filesystem path for embedded doc {self.id} without"
f" index"
)
raise ValueError(msg)
path = artifacts_dir(doc_id=self.id, project=self.index) / "raw"
location = DocumentLocation.ARTIFACTS
return FilesystemDocument(
id=self.id,
path=path,
index=self.index,
location=location,
resource_name=resource_name,
)


Expand Down
34 changes: 19 additions & 15 deletions datashare-python/datashare_python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,12 @@ def safe_dir(doc_id: str) -> Path:
return Path(*parts)


def _artifacts_dir(doc_id: str, *, project: str) -> Path:
def artifacts_dir(doc_id: str, *, project: str) -> Path:
return Path(project, safe_dir(doc_id), doc_id)


def _metadata_path(doc_id: str, *, project: str) -> Path:
metadata_path = _artifacts_dir(doc_id, project=project) / METADATA_JSON
metadata_path = artifacts_dir(doc_id, project=project) / METADATA_JSON
return metadata_path


Expand All @@ -455,7 +455,7 @@ def _read_artifact_metadata(root: Path, artifact: DocArtifact) -> dict:


def write_artifact(root: Path, artifact: DocArtifact) -> Path:
artif_dir = root / _artifacts_dir(artifact.doc_id, project=artifact.project)
artif_dir = root / artifacts_dir(artifact.doc_id, project=artifact.project)
artif_dir.mkdir(exist_ok=True, parents=True)
# TODO: if transcriptions are too large we could also serialize them
# as jsonl
Expand All @@ -479,19 +479,23 @@ def debuggable_name(
displayable_file_name = [c[:component_size_limit] for c in path.parts]
uuid = sha256(str(path).encode()).hexdigest() if deterministic else uuid4().hex
uuid = uuid[:20]
return f"{uuid}-{'__'.join(displayable_file_name)}"
return f"{uuid}-{'--'.join(displayable_file_name)}"


def activity_contextual_id(*, wf_context: bool = True) -> str:
contextual_id = ""
def activity_contextual_id(
*, wf_context: bool = True, act_context: bool = False, run_context: bool = False
) -> str:
contextual_id = []
act_info = activity.info()
if not wf_context and not act_context:
raise ValueError("at least one of wf_context and act_context must be True")
if wf_context:
wf_id = act_info.workflow_id
wf_run_id = act_info.workflow_run_id
wf_type = act_info.workflow_type
contextual_id += f"{wf_type}-{wf_id}-{wf_run_id}-"
act_id = act_info.activity_id
act_run_id = act_info.activity_id
act_type = act_info.activity_type
contextual_id = f"{act_type}-{act_id}-{act_run_id}"
return contextual_id
contextual_id.append(act_info.workflow_id)
if run_context:
contextual_id.append(act_info.workflow_run_id)
if act_context:
contextual_id.append(act_info.activity_type)
contextual_id.append(act_info.activity_id)
if run_context:
contextual_id.append(act_info.activity_run_id)
return "-".join(contextual_id)
Loading
Loading