Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby

# Changelog

## Unreleased

File-based Excel streams can now select a specific worksheet or read every worksheet in a workbook.

## 6.5.2

bugfix: Ensure that streams with partition router are not executed concurrently
Expand Down
5 changes: 5 additions & 0 deletions airbyte_cdk/sources/file_based/config/excel_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ class Config(OneOfOptionConfig):
"excel",
const=True,
)
sheet_name: str = Field(
default="0",
title="Sheet Name",
description='The Excel worksheet to read. Use a sheet name, a zero-indexed sheet position like "0", or "*" to read all sheets.',
)
79 changes: 55 additions & 24 deletions airbyte_cdk/sources/file_based/file_types/excel_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

class ExcelParser(FileTypeParser):
ENCODING = None
ALL_SHEETS = "*"

def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
"""
Expand Down Expand Up @@ -62,18 +63,21 @@ async def infer_schema(

# Validate the format of the config
self.validate_format(config.format, logger)
excel_format = config.format
if not isinstance(excel_format, ExcelFormat):
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)

fields: Dict[str, str] = {}

with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
df = self.open_and_parse_file(fp, logger, file)
for column, df_type in df.dtypes.items():
# Choose the broadest data type if the column's data type differs in dataframes
prev_frame_column_type = fields.get(column) # type: ignore [call-overload]
fields[column] = self.dtype_to_json_type( # type: ignore [index]
prev_frame_column_type,
df_type,
)
for df in self._parse_excel_file(fp, excel_format, logger, file).values():
for column, df_type in df.dtypes.items():
# Choose the broadest data type if the column's data type differs in dataframes
prev_frame_column_type = fields.get(column) # type: ignore [call-overload]
fields[column] = self.dtype_to_json_type( # type: ignore [index]
prev_frame_column_type,
df_type,
)

schema = {
field: (
Expand Down Expand Up @@ -109,18 +113,21 @@ def parse_records(

# Validate the format of the config
self.validate_format(config.format, logger)
excel_format = config.format
if not isinstance(excel_format, ExcelFormat):
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)

try:
# Open and parse the file using the stream reader
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
df = self.open_and_parse_file(fp, logger, file)
# Yield records as dictionaries
# DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson
# DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior
# see PR description: https://github.com/airbytehq/airbyte/pull/44444/
yield from orjson.loads(
df.to_json(orient="records", date_format="iso", date_unit="us")
)
for df in self._parse_excel_file(fp, excel_format, logger, file).values():
# Yield records as dictionaries
# DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson
# DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior
# see PR description: https://github.com/airbytehq/airbyte/pull/44444/
yield from orjson.loads(
df.to_json(orient="records", date_format="iso", date_unit="us")
)

except Exception as exc:
# Raise a RecordParseError if any exception occurs during parsing
Expand Down Expand Up @@ -187,7 +194,8 @@ def _open_and_parse_file_with_calamine(
fp: Union[IOBase, str, Path],
logger: logging.Logger,
file: RemoteFile,
) -> pd.DataFrame:
sheet_name: int | str | None = 0,
) -> pd.DataFrame | Dict[int | str, pd.DataFrame]:
"""Opens and parses Excel file using Calamine engine.

Args:
Expand All @@ -202,7 +210,7 @@ def _open_and_parse_file_with_calamine(
ExcelCalamineParsingError: If Calamine fails to parse the file.
"""
try:
return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
return pd.ExcelFile(fp, engine="calamine").parse(sheet_name=sheet_name) # type: ignore [arg-type, call-overload, no-any-return]
except BaseException as exc:
# Calamine engine raises PanicException(child of BaseException) if Calamine fails to parse the file.
# Checking if ValueError in exception arg to know if it was actually an error during parsing due to invalid values in cells.
Expand All @@ -222,7 +230,8 @@ def _open_and_parse_file_with_openpyxl(
fp: Union[IOBase, str, Path],
logger: logging.Logger,
file: RemoteFile,
) -> pd.DataFrame:
sheet_name: int | str | None = 0,
) -> pd.DataFrame | Dict[int | str, pd.DataFrame]:
"""Opens and parses Excel file using Openpyxl engine.

Args:
Expand All @@ -245,19 +254,20 @@ def _open_and_parse_file_with_openpyxl(

with warnings.catch_warnings(record=True) as warning_records:
warnings.simplefilter("always")
df = pd.ExcelFile(fp, engine="openpyxl").parse() # type: ignore [arg-type, call-overload]
dfs = pd.ExcelFile(fp, engine="openpyxl").parse(sheet_name=sheet_name) # type: ignore [arg-type, call-overload]

for warning in warning_records:
logger.warning(f"Openpyxl warning for {file.file_uri_for_logging}: {warning.message}")

return df # type: ignore [no-any-return]
return dfs # type: ignore [no-any-return]

def open_and_parse_file(
self,
fp: Union[IOBase, str, Path],
logger: logging.Logger,
file: RemoteFile,
) -> pd.DataFrame:
sheet_name: int | str | None = 0,
) -> pd.DataFrame | Dict[int | str, pd.DataFrame]:
"""Opens and parses the Excel file with Calamine-first and Openpyxl fallback.

Args:
Expand All @@ -269,6 +279,27 @@ def open_and_parse_file(
pd.DataFrame: Parsed data from the Excel file.
"""
try:
return self._open_and_parse_file_with_calamine(fp, logger, file)
return self._open_and_parse_file_with_calamine(fp, logger, file, sheet_name)
except ExcelCalamineParsingError:
return self._open_and_parse_file_with_openpyxl(fp, logger, file)
return self._open_and_parse_file_with_openpyxl(fp, logger, file, sheet_name)

def _parse_excel_file(
self,
fp: Union[IOBase, str, Path],
excel_format: ExcelFormat,
logger: logging.Logger,
file: RemoteFile,
) -> Dict[int | str, pd.DataFrame]:
sheet_name = self._get_sheet_name(excel_format)
parsed_file = self.open_and_parse_file(fp, logger, file, sheet_name)
if isinstance(parsed_file, pd.DataFrame):
return {excel_format.sheet_name: parsed_file}
return parsed_file

def _get_sheet_name(self, excel_format: ExcelFormat) -> int | str | None:
sheet_name = excel_format.sheet_name
if sheet_name == self.ALL_SHEETS:
return None
if sheet_name.isdecimal():
return int(sheet_name)
return sheet_name
53 changes: 47 additions & 6 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

import importlib
import os
from collections.abc import Callable
from inspect import Parameter, signature
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast

from boltons.typeutils import classproperty

from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.models import (
ConnectorTestScenario,
Expand All @@ -18,8 +21,6 @@
from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite

if TYPE_CHECKING:
from collections.abc import Callable

from airbyte_cdk.test import entrypoint_wrapper


Expand Down Expand Up @@ -85,13 +86,25 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None:
def create_connector(
cls,
scenario: ConnectorTestScenario | None,
catalog: ConfiguredAirbyteCatalog | None = None,
) -> IConnector:
"""Instantiate the connector class."""
connector = cls.connector # type: ignore
if connector:
if callable(connector) or isinstance(connector, type):
# If the connector is a class or factory function, instantiate it:
return cast(IConnector, connector()) # type: ignore [redundant-cast]
if isinstance(connector, type):
config = None
if scenario and _requires_legacy_file_based_constructor_args(connector):
config = (
scenario.config_dict
if scenario.config_dict is not None
else scenario.get_config_dict(
empty_if_missing=True,
connector_root=cls.get_connector_root_dir(),
)
)
return cls._instantiate_connector_class(connector, catalog, config)
if callable(connector):
return connector()

# Otherwise, we can't instantiate the connector. Fail with a clear error message.
raise NotImplementedError(
Expand All @@ -100,6 +113,24 @@ def create_connector(
"override `cls.create_connector()` to define a custom initialization process."
)

@staticmethod
def _instantiate_connector_class(
connector_class: type[IConnector],
catalog: ConfiguredAirbyteCatalog | None = None,
config: dict[str, Any] | None = None,
) -> IConnector:
"""Instantiate connector classes supported by standard tests."""
if _requires_legacy_file_based_constructor_args(connector_class):
legacy_file_based_connector = cast(
Callable[
[ConfiguredAirbyteCatalog | None, dict[str, Any] | None, None], IConnector
],
connector_class,
)
return legacy_file_based_connector(catalog, config, None)

return connector_class()

# Test Definitions

def test_check(
Expand All @@ -117,3 +148,13 @@ def test_check(
f"Expected exactly one CONNECTION_STATUS message. "
"Got: {result.connection_status_messages!s}"
)


def _requires_legacy_file_based_constructor_args(connector_class: type[IConnector]) -> bool:
required_positional_parameter_names = [
parameter.name
for parameter in signature(connector_class).parameters.values()
if parameter.default is Parameter.empty
and parameter.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD)
]
return required_positional_parameter_names == ["catalog", "config", "state"]
2 changes: 2 additions & 0 deletions airbyte_cdk/test/standard_tests/declarative_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import yaml
from boltons.typeutils import classproperty

from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
Expand Down Expand Up @@ -65,6 +66,7 @@ def components_py_path(cls) -> Path | None:
def create_connector(
cls,
scenario: ConnectorTestScenario | None,
catalog: ConfiguredAirbyteCatalog | None = None,
) -> IConnector:
"""Create a connector scenario for the test suite.

Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/test/standard_tests/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_basic_read(
]
)
result = run_test_job(
self.create_connector(scenario),
self.create_connector(scenario, configured_catalog),
"read",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_fail_read_with_bad_catalog(
],
)
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
self.create_connector(scenario, invalid_configured_catalog),
"read",
connector_root=self.get_connector_root_dir(),
test_scenario=scenario.with_expecting_failure(), # Expect failure due to bad catalog
Expand Down
Loading
Loading