diff --git a/src/factorlab/factors/__init__.py b/src/factorlab/factors/__init__.py new file mode 100644 index 0000000..3068a35 --- /dev/null +++ b/src/factorlab/factors/__init__.py @@ -0,0 +1,4 @@ +from factorlab.factors.base import Factor + +__all__ = ["Factor"] + diff --git a/src/factorlab/factors/volume/__init__.py b/src/factorlab/factors/volume/__init__.py new file mode 100644 index 0000000..2aeff80 --- /dev/null +++ b/src/factorlab/factors/volume/__init__.py @@ -0,0 +1,41 @@ +from factorlab.factors.volume.base import VolumeFactor +from factorlab.factors.volume.volume import Volume +from factorlab.factors.volume.volume_momentum import VolumeMomentum +from factorlab.factors.volume.delta_volume_momentum import DeltaVolumeMomentum +from factorlab.factors.volume.volume_weighted_ma_over_ma import VolumeWeightedMAOverMA +from factorlab.factors.volume.diff_volume_weighted_ma_over_ma import DiffVolumeWeightedMAOverMA +from factorlab.factors.volume.price_volume_fit import PriceVolumeFit +from factorlab.factors.volume.diff_price_volume_fit import DiffPriceVolumeFit +from factorlab.factors.volume.delta_price_volume_fit import DeltaPriceVolumeFit +from factorlab.factors.volume.on_balance_volume import OnBalanceVolume +from factorlab.factors.volume.delta_on_balance_volume import DeltaOnBalanceVolume +from factorlab.factors.volume.positive_volume_indicator import PositiveVolumeIndicator +from factorlab.factors.volume.delta_positive_volume_indicator import DeltaPositiveVolumeIndicator +from factorlab.factors.volume.negative_volume_indicator import NegativeVolumeIndicator +from factorlab.factors.volume.delta_negative_volume_indicator import DeltaNegativeVolumeIndicator +from factorlab.factors.volume.product_price_volume import ProductPriceVolume +from factorlab.factors.volume.sum_price_volume import SumPriceVolume +from factorlab.factors.volume.delta_product_price_volume import DeltaProductPriceVolume +from factorlab.factors.volume.delta_sum_price_volume import DeltaSumPriceVolume + +__all__ = [ + "VolumeFactor", + "Volume", + "VolumeMomentum", + "DeltaVolumeMomentum", + "VolumeWeightedMAOverMA", + "DiffVolumeWeightedMAOverMA", + "PriceVolumeFit", + "DiffPriceVolumeFit", + "DeltaPriceVolumeFit", + "OnBalanceVolume", + "DeltaOnBalanceVolume", + "PositiveVolumeIndicator", + "DeltaPositiveVolumeIndicator", + "NegativeVolumeIndicator", + "DeltaNegativeVolumeIndicator", + "ProductPriceVolume", + "SumPriceVolume", + "DeltaProductPriceVolume", + "DeltaSumPriceVolume", +] diff --git a/src/factorlab/factors/volume/base.py b/src/factorlab/factors/volume/base.py new file mode 100644 index 0000000..e898e4f --- /dev/null +++ b/src/factorlab/factors/volume/base.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Union + +import numpy as np +import pandas as pd + +from factorlab.factors.base import Factor +from factorlab.features.transforms.dispersion import Dispersion +from factorlab.features.transforms.returns import Difference, Returns +from factorlab.features.transforms.smoothing import WindowSmoother +from factorlab.utils import to_dataframe + + +class VolumeFactor(Factor, ABC): + """Base class for volume/price interaction factors.""" + + def __init__( + self, + price_col: str = "close", + volume_col: str = "volume", + output_col: Optional[str] = None, + compress: bool = True, + compression_window: int = 250, + compression_min_periods: int = 30, + compression_strength: float = 1.0, + **kwargs: Any, + ): + super().__init__( + name=self.__class__.__name__, + description="Base class for volume factors.", + category="Volume", + tags=["volume", "flow", "microstructure"], + ) + self.price_col = price_col + self.volume_col = volume_col + self.output_col = output_col + self.compress = compress + self.compression_window = compression_window + self.compression_min_periods = compression_min_periods + self.compression_strength = compression_strength + self.kwargs = kwargs + + @property + def inputs(self) -> List[str]: + return [self.price_col, self.volume_col] + + def fit( + self, + X: Union[pd.Series, pd.DataFrame], + y: Optional[Union[pd.Series, pd.DataFrame]] = None, + ) -> "VolumeFactor": + df_input = to_dataframe(X) + self.validate_inputs(df_input) + self._is_fitted = True + return self + + def transform(self, X: Union[pd.Series, pd.DataFrame]) -> pd.DataFrame: + if not self._is_fitted: + raise RuntimeError(f"Transform '{self.name}' must be fitted before calling transform()") + + df = to_dataframe(X).copy(deep=True) + self.validate_inputs(df) + df = df.sort_index() + + factor = self._compute_volume(df) + if self.compress: + factor = self._compress(factor) + + df[self._generate_name()] = factor.clip(-50, 50) + return df + + @abstractmethod + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + raise NotImplementedError + + def _generate_name(self) -> str: + return self.output_col or self.name + + def _is_multiindex(self, series: pd.Series) -> bool: + return isinstance(series.index, pd.MultiIndex) + + def _safe_log(self, series: pd.Series) -> pd.Series: + return np.log(series.where(series > 0, np.nan)) + + def _series_frame(self, series: pd.Series, col: str = "value") -> pd.DataFrame: + return series.astype("float64").to_frame(col) + + def _shift_by_asset(self, series: pd.Series, periods: int) -> pd.Series: + if self._is_multiindex(series): + return series.groupby(level=1).shift(periods) + return series.shift(periods) + + def _pct_change_by_asset(self, series: pd.Series, periods: int = 1) -> pd.Series: + df = self._series_frame(series) + ret = Returns(method="pct", input_col="value", output_col="ret", lags=periods).compute(df) + return ret["ret"] + + def _diff_by_asset(self, series: pd.Series, periods: int = 1) -> pd.Series: + df = self._series_frame(series) + diff = Difference(input_col="value", output_col="diff", lags=periods).compute(df) + return diff["diff"] + + def _rolling_mean( + self, + series: pd.Series, + window: int, + min_periods: Optional[int] = None, + ) -> pd.Series: + min_periods = window if min_periods is None else min_periods + df = self._series_frame(series) + smoothed = WindowSmoother( + input_cols="value", + output_cols="mean", + window_type="rolling", + window_size=window, + central_tendency="mean", + min_periods=min_periods, + ).compute(df) + return smoothed["mean"] + + def _rolling_median( + self, + series: pd.Series, + window: int, + min_periods: Optional[int] = None, + ) -> pd.Series: + min_periods = window if min_periods is None else min_periods + df = self._series_frame(series) + smoothed = WindowSmoother( + input_cols="value", + output_cols="median", + window_type="rolling", + window_size=window, + central_tendency="median", + min_periods=min_periods, + ).compute(df) + return smoothed["median"] + + def _rolling_std( + self, + series: pd.Series, + window: int, + min_periods: Optional[int] = None, + ) -> pd.Series: + min_periods = 2 if min_periods is None else min_periods + df = self._series_frame(series) + dispersion = Dispersion( + method="std", + input_col="value", + output_col="std", + axis="ts", + window_type="rolling", + window_size=window, + min_periods=min_periods, + ).compute(df) + return dispersion["std"] + + def _rolling_stat( + self, + series: pd.Series, + window: int, + stat: str, + min_periods: Optional[int] = None, + **kwargs: Any, + ) -> pd.Series: + min_periods = window if min_periods is None else min_periods + + if self._is_multiindex(series): + rolled = getattr( + series.groupby(level=1).rolling(window=window, min_periods=min_periods), + stat, + )(**kwargs) + return rolled.droplevel(0).sort_index() + + return getattr(series.rolling(window=window, min_periods=min_periods), stat)(**kwargs) + + def _compress(self, raw: pd.Series) -> pd.Series: + robust_scale = self._rolling_median( + raw.abs(), + window=self.compression_window, + min_periods=self.compression_min_periods, + ).replace(0, np.nan) + + normalized = raw / robust_scale + return 50.0 * np.tanh(self.compression_strength * normalized) diff --git a/src/factorlab/factors/volume/delta_negative_volume_indicator.py b/src/factorlab/factors/volume/delta_negative_volume_indicator.py new file mode 100644 index 0000000..3207e47 --- /dev/null +++ b/src/factorlab/factors/volume/delta_negative_volume_indicator.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DeltaNegativeVolumeIndicator(VolumeFactor): + def __init__(self, hist_length: int = 40, delta_dist: int = 35, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.delta_dist = delta_dist + self.name = "DeltaNegativeVolumeIndicator" + self.description = "Current minus lagged negative-volume indicator." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + rel_change = self._pct_change_by_asset(close, periods=1) + prev_volume = self._shift_by_asset(volume, 1) + filtered = rel_change.where(volume < prev_volume, 0.0) + + avg_change = self._rolling_mean(filtered, window=self.hist_length) + norm_window = max(2 * self.hist_length, 250) + std_change = self._rolling_std( + rel_change, + window=norm_window, + min_periods=self.hist_length, + ).replace(0, np.nan) + nvi = avg_change / std_change + + return nvi - self._shift_by_asset(nvi, self.delta_dist) diff --git a/src/factorlab/factors/volume/delta_on_balance_volume.py b/src/factorlab/factors/volume/delta_on_balance_volume.py new file mode 100644 index 0000000..1543a2d --- /dev/null +++ b/src/factorlab/factors/volume/delta_on_balance_volume.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DeltaOnBalanceVolume(VolumeFactor): + def __init__(self, hist_length: int = 50, delta_dist: int = 45, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.delta_dist = delta_dist + self.name = "DeltaOnBalanceVolume" + self.description = "Current minus lagged on-balance-volume signal." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + close_diff = self._diff_by_asset(close, 1) + signed_volume = volume * np.sign(close_diff) + + signed_sum = self._rolling_stat(signed_volume, window=self.hist_length, stat="sum") + total_sum = self._rolling_stat(volume, window=self.hist_length, stat="sum") + obv = signed_sum / total_sum.replace(0, np.nan) + + return obv - self._shift_by_asset(obv, self.delta_dist) diff --git a/src/factorlab/factors/volume/delta_positive_volume_indicator.py b/src/factorlab/factors/volume/delta_positive_volume_indicator.py new file mode 100644 index 0000000..31b8889 --- /dev/null +++ b/src/factorlab/factors/volume/delta_positive_volume_indicator.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DeltaPositiveVolumeIndicator(VolumeFactor): + def __init__(self, hist_length: int = 40, delta_dist: int = 35, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.delta_dist = delta_dist + self.name = "DeltaPositiveVolumeIndicator" + self.description = "Current minus lagged positive-volume indicator." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + rel_change = self._pct_change_by_asset(close, periods=1) + prev_volume = self._shift_by_asset(volume, 1) + filtered = rel_change.where(volume > prev_volume, 0.0) + + avg_change = self._rolling_mean(filtered, window=self.hist_length) + norm_window = max(2 * self.hist_length, 250) + std_change = self._rolling_std( + rel_change, + window=norm_window, + min_periods=self.hist_length, + ).replace(0, np.nan) + pvi = avg_change / std_change + + return pvi - self._shift_by_asset(pvi, self.delta_dist) diff --git a/src/factorlab/factors/volume/delta_price_volume_fit.py b/src/factorlab/factors/volume/delta_price_volume_fit.py new file mode 100644 index 0000000..a44300c --- /dev/null +++ b/src/factorlab/factors/volume/delta_price_volume_fit.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DeltaPriceVolumeFit(VolumeFactor): + def __init__(self, hist_length: int = 20, delta_dist: int = 30, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.delta_dist = delta_dist + self.name = "DeltaPriceVolumeFit" + self.description = "Current minus lagged price-volume fit slope." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + x = self._safe_log(volume) + y = self._safe_log(close) + + mean_x = self._rolling_mean(x, window=self.hist_length) + mean_y = self._rolling_mean(y, window=self.hist_length) + mean_xy = self._rolling_mean(x * y, window=self.hist_length) + mean_x2 = self._rolling_mean(x * x, window=self.hist_length) + + cov_xy = mean_xy - (mean_x * mean_y) + var_x = mean_x2 - (mean_x * mean_x) + pvf = cov_xy / var_x.replace(0, np.nan) + + return pvf - self._shift_by_asset(pvf, self.delta_dist) diff --git a/src/factorlab/factors/volume/delta_product_price_volume.py b/src/factorlab/factors/volume/delta_product_price_volume.py new file mode 100644 index 0000000..68da9a5 --- /dev/null +++ b/src/factorlab/factors/volume/delta_product_price_volume.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DeltaProductPriceVolume(VolumeFactor): + def __init__( + self, + hist_length: int = 40, + delta_dist: int = 35, + norm_lookback: int = 250, + norm_min_periods: int = 50, + **kwargs, + ): + super().__init__(**kwargs) + self.hist_length = hist_length + self.delta_dist = delta_dist + self.norm_lookback = norm_lookback + self.norm_min_periods = norm_min_periods + self.name = "DeltaProductPriceVolume" + self.description = "Current minus lagged product-price-volume signal." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}" + + def _normalized_volume_and_price_change(self, df: pd.DataFrame) -> tuple[pd.Series, pd.Series]: + close = df[self.price_col] + volume = df[self.volume_col] + + prior_volume = self._shift_by_asset(volume, 1) + median_volume = self._rolling_median( + prior_volume, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ).replace(0, np.nan) + normalized_volume = volume / median_volume + + log_close = self._safe_log(close) + price_change = self._diff_by_asset(log_close, 1) + prior_change = self._shift_by_asset(price_change, 1) + + median_change = self._rolling_median( + prior_change, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ) + q75 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.75, + ) + q25 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.25, + ) + iqr = (q75 - q25).replace(0, np.nan) + + normalized_change = (price_change - median_change) / iqr + return normalized_volume, normalized_change + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + normalized_volume, normalized_change = self._normalized_volume_and_price_change(df) + precursor = normalized_volume * normalized_change + ppv = self._rolling_mean(precursor, window=self.hist_length) + return ppv - self._shift_by_asset(ppv, self.delta_dist) diff --git a/src/factorlab/factors/volume/delta_sum_price_volume.py b/src/factorlab/factors/volume/delta_sum_price_volume.py new file mode 100644 index 0000000..3822e57 --- /dev/null +++ b/src/factorlab/factors/volume/delta_sum_price_volume.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DeltaSumPriceVolume(VolumeFactor): + def __init__( + self, + hist_length: int = 40, + delta_dist: int = 35, + norm_lookback: int = 250, + norm_min_periods: int = 50, + **kwargs, + ): + super().__init__(**kwargs) + self.hist_length = hist_length + self.delta_dist = delta_dist + self.norm_lookback = norm_lookback + self.norm_min_periods = norm_min_periods + self.name = "DeltaSumPriceVolume" + self.description = "Current minus lagged sum-price-volume signal." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}" + + def _normalized_volume_and_price_change(self, df: pd.DataFrame) -> tuple[pd.Series, pd.Series]: + close = df[self.price_col] + volume = df[self.volume_col] + + prior_volume = self._shift_by_asset(volume, 1) + median_volume = self._rolling_median( + prior_volume, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ).replace(0, np.nan) + normalized_volume = volume / median_volume + + log_close = self._safe_log(close) + price_change = self._diff_by_asset(log_close, 1) + prior_change = self._shift_by_asset(price_change, 1) + + median_change = self._rolling_median( + prior_change, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ) + q75 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.75, + ) + q25 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.25, + ) + iqr = (q75 - q25).replace(0, np.nan) + + normalized_change = (price_change - median_change) / iqr + return normalized_volume, normalized_change + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + normalized_volume, normalized_change = self._normalized_volume_and_price_change(df) + precursor = normalized_volume + normalized_change.abs() + precursor = precursor.where(normalized_change >= 0, -precursor) + spv = self._rolling_mean(precursor, window=self.hist_length) + return spv - self._shift_by_asset(spv, self.delta_dist) diff --git a/src/factorlab/factors/volume/delta_volume_momentum.py b/src/factorlab/factors/volume/delta_volume_momentum.py new file mode 100644 index 0000000..3a5e71a --- /dev/null +++ b/src/factorlab/factors/volume/delta_volume_momentum.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DeltaVolumeMomentum(VolumeFactor): + def __init__(self, hist_length: int = 20, multiplier: int = 4, delta_len: int = 100, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.multiplier = multiplier + self.delta_len = delta_len + self.name = "DeltaVolumeMomentum" + self.description = "Current minus lagged volume momentum." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.multiplier}_{self.delta_len}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + volume = df[self.volume_col] + short_ma = self._rolling_mean(volume, window=self.hist_length) + long_ma = self._rolling_mean(volume, window=self.hist_length * self.multiplier) + vmom = self._safe_log(short_ma / long_ma.replace(0, np.nan)) + return vmom - self._shift_by_asset(vmom, self.delta_len) diff --git a/src/factorlab/factors/volume/diff_price_volume_fit.py b/src/factorlab/factors/volume/diff_price_volume_fit.py new file mode 100644 index 0000000..e68fc7f --- /dev/null +++ b/src/factorlab/factors/volume/diff_price_volume_fit.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DiffPriceVolumeFit(VolumeFactor): + def __init__(self, short_dist: int = 20, long_dist: int = 100, **kwargs): + super().__init__(**kwargs) + self.short_dist = short_dist + self.long_dist = long_dist + self.name = "DiffPriceVolumeFit" + self.description = "Short minus long price-volume fit slope." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.short_dist}_{self.long_dist}" + + def _pv_fit(self, df: pd.DataFrame, hist_length: int) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + x = self._safe_log(volume) + y = self._safe_log(close) + + mean_x = self._rolling_mean(x, window=hist_length) + mean_y = self._rolling_mean(y, window=hist_length) + mean_xy = self._rolling_mean(x * y, window=hist_length) + mean_x2 = self._rolling_mean(x * x, window=hist_length) + + cov_xy = mean_xy - (mean_x * mean_y) + var_x = mean_x2 - (mean_x * mean_x) + return cov_xy / var_x.replace(0, np.nan) + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + short = self._pv_fit(df, self.short_dist) + long = self._pv_fit(df, self.long_dist) + return short - long diff --git a/src/factorlab/factors/volume/diff_volume_weighted_ma_over_ma.py b/src/factorlab/factors/volume/diff_volume_weighted_ma_over_ma.py new file mode 100644 index 0000000..3ce49d5 --- /dev/null +++ b/src/factorlab/factors/volume/diff_volume_weighted_ma_over_ma.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class DiffVolumeWeightedMAOverMA(VolumeFactor): + def __init__(self, short_dist: int = 20, long_dist: int = 100, **kwargs): + super().__init__(**kwargs) + self.short_dist = short_dist + self.long_dist = long_dist + self.name = "DiffVolumeWeightedMAOverMA" + self.description = "Short minus long VWMA-over-MA signal." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.short_dist}_{self.long_dist}" + + def _vwma_over_ma(self, df: pd.DataFrame, hist_length: int) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + pv = close * volume + vwma = self._rolling_stat(pv, window=hist_length, stat="sum") / self._rolling_stat( + volume, window=hist_length, stat="sum" + ).replace(0, np.nan) + ma = self._rolling_mean(close, window=hist_length) + + return self._safe_log(vwma / ma.replace(0, np.nan)) + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + short = self._vwma_over_ma(df, self.short_dist) + long = self._vwma_over_ma(df, self.long_dist) + return short - long diff --git a/src/factorlab/factors/volume/negative_volume_indicator.py b/src/factorlab/factors/volume/negative_volume_indicator.py new file mode 100644 index 0000000..5e4a1db --- /dev/null +++ b/src/factorlab/factors/volume/negative_volume_indicator.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class NegativeVolumeIndicator(VolumeFactor): + def __init__(self, hist_length: int = 40, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.name = "NegativeVolumeIndicator" + self.description = "Normalized average return on falling-volume bars." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + rel_change = self._pct_change_by_asset(close, periods=1) + prev_volume = self._shift_by_asset(volume, 1) + filtered = rel_change.where(volume < prev_volume, 0.0) + + avg_change = self._rolling_mean(filtered, window=self.hist_length) + norm_window = max(2 * self.hist_length, 250) + std_change = self._rolling_std( + rel_change, + window=norm_window, + min_periods=self.hist_length, + ).replace(0, np.nan) + + return avg_change / std_change diff --git a/src/factorlab/factors/volume/on_balance_volume.py b/src/factorlab/factors/volume/on_balance_volume.py new file mode 100644 index 0000000..fad29da --- /dev/null +++ b/src/factorlab/factors/volume/on_balance_volume.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class OnBalanceVolume(VolumeFactor): + def __init__(self, hist_length: int = 50, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.name = "OnBalanceVolume" + self.description = "Signed-volume over total-volume ratio." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + close_diff = self._diff_by_asset(close, 1) + signed_volume = volume * np.sign(close_diff) + + signed_sum = self._rolling_stat(signed_volume, window=self.hist_length, stat="sum") + total_sum = self._rolling_stat(volume, window=self.hist_length, stat="sum") + + return signed_sum / total_sum.replace(0, np.nan) diff --git a/src/factorlab/factors/volume/positive_volume_indicator.py b/src/factorlab/factors/volume/positive_volume_indicator.py new file mode 100644 index 0000000..4db2308 --- /dev/null +++ b/src/factorlab/factors/volume/positive_volume_indicator.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class PositiveVolumeIndicator(VolumeFactor): + def __init__(self, hist_length: int = 40, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.name = "PositiveVolumeIndicator" + self.description = "Normalized average return on rising-volume bars." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + rel_change = self._pct_change_by_asset(close, periods=1) + prev_volume = self._shift_by_asset(volume, 1) + filtered = rel_change.where(volume > prev_volume, 0.0) + + avg_change = self._rolling_mean(filtered, window=self.hist_length) + norm_window = max(2 * self.hist_length, 250) + std_change = self._rolling_std( + rel_change, + window=norm_window, + min_periods=self.hist_length, + ).replace(0, np.nan) + + return avg_change / std_change diff --git a/src/factorlab/factors/volume/price_volume_fit.py b/src/factorlab/factors/volume/price_volume_fit.py new file mode 100644 index 0000000..39d7885 --- /dev/null +++ b/src/factorlab/factors/volume/price_volume_fit.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class PriceVolumeFit(VolumeFactor): + def __init__(self, hist_length: int = 50, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.name = "PriceVolumeFit" + self.description = "Rolling slope for log(price) on log(volume)." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + x = self._safe_log(volume) + y = self._safe_log(close) + + mean_x = self._rolling_mean(x, window=self.hist_length) + mean_y = self._rolling_mean(y, window=self.hist_length) + mean_xy = self._rolling_mean(x * y, window=self.hist_length) + mean_x2 = self._rolling_mean(x * x, window=self.hist_length) + + cov_xy = mean_xy - (mean_x * mean_y) + var_x = mean_x2 - (mean_x * mean_x) + return cov_xy / var_x.replace(0, np.nan) diff --git a/src/factorlab/factors/volume/product_price_volume.py b/src/factorlab/factors/volume/product_price_volume.py new file mode 100644 index 0000000..962d4c8 --- /dev/null +++ b/src/factorlab/factors/volume/product_price_volume.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class ProductPriceVolume(VolumeFactor): + def __init__( + self, + hist_length: int = 25, + norm_lookback: int = 250, + norm_min_periods: int = 50, + **kwargs, + ): + super().__init__(**kwargs) + self.hist_length = hist_length + self.norm_lookback = norm_lookback + self.norm_min_periods = norm_min_periods + self.name = "ProductPriceVolume" + self.description = "Smoothed product of normalized price and volume shocks." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}" + + def _normalized_volume_and_price_change(self, df: pd.DataFrame) -> tuple[pd.Series, pd.Series]: + close = df[self.price_col] + volume = df[self.volume_col] + + prior_volume = self._shift_by_asset(volume, 1) + median_volume = self._rolling_median( + prior_volume, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ).replace(0, np.nan) + normalized_volume = volume / median_volume + + log_close = self._safe_log(close) + price_change = self._diff_by_asset(log_close, 1) + prior_change = self._shift_by_asset(price_change, 1) + + median_change = self._rolling_median( + prior_change, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ) + q75 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.75, + ) + q25 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.25, + ) + iqr = (q75 - q25).replace(0, np.nan) + + normalized_change = (price_change - median_change) / iqr + return normalized_volume, normalized_change + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + normalized_volume, normalized_change = self._normalized_volume_and_price_change(df) + precursor = normalized_volume * normalized_change + return self._rolling_mean(precursor, window=self.hist_length) diff --git a/src/factorlab/factors/volume/sum_price_volume.py b/src/factorlab/factors/volume/sum_price_volume.py new file mode 100644 index 0000000..d8d60e2 --- /dev/null +++ b/src/factorlab/factors/volume/sum_price_volume.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class SumPriceVolume(VolumeFactor): + def __init__( + self, + hist_length: int = 25, + norm_lookback: int = 250, + norm_min_periods: int = 50, + **kwargs, + ): + super().__init__(**kwargs) + self.hist_length = hist_length + self.norm_lookback = norm_lookback + self.norm_min_periods = norm_min_periods + self.name = "SumPriceVolume" + self.description = "Smoothed signed sum of normalized price/volume shocks." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}" + + def _normalized_volume_and_price_change(self, df: pd.DataFrame) -> tuple[pd.Series, pd.Series]: + close = df[self.price_col] + volume = df[self.volume_col] + + prior_volume = self._shift_by_asset(volume, 1) + median_volume = self._rolling_median( + prior_volume, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ).replace(0, np.nan) + normalized_volume = volume / median_volume + + log_close = self._safe_log(close) + price_change = self._diff_by_asset(log_close, 1) + prior_change = self._shift_by_asset(price_change, 1) + + median_change = self._rolling_median( + prior_change, + window=self.norm_lookback, + min_periods=self.norm_min_periods, + ) + q75 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.75, + ) + q25 = self._rolling_stat( + prior_change, + window=self.norm_lookback, + stat="quantile", + min_periods=self.norm_min_periods, + q=0.25, + ) + iqr = (q75 - q25).replace(0, np.nan) + + normalized_change = (price_change - median_change) / iqr + return normalized_volume, normalized_change + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + normalized_volume, normalized_change = self._normalized_volume_and_price_change(df) + precursor = normalized_volume + normalized_change.abs() + precursor = precursor.where(normalized_change >= 0, -precursor) + return self._rolling_mean(precursor, window=self.hist_length) diff --git a/src/factorlab/factors/volume/volume.py b/src/factorlab/factors/volume/volume.py new file mode 100644 index 0000000..f3b3381 --- /dev/null +++ b/src/factorlab/factors/volume/volume.py @@ -0,0 +1,132 @@ +import pandas as pd +from typing import ClassVar, Dict, Optional, Type, Union + +from factorlab.core.base_transform import BaseTransform +from factorlab.factors.base import Factor +from factorlab.factors.volume.volume_momentum import VolumeMomentum +from factorlab.factors.volume.delta_volume_momentum import DeltaVolumeMomentum +from factorlab.factors.volume.volume_weighted_ma_over_ma import VolumeWeightedMAOverMA +from factorlab.factors.volume.diff_volume_weighted_ma_over_ma import DiffVolumeWeightedMAOverMA +from factorlab.factors.volume.price_volume_fit import PriceVolumeFit +from factorlab.factors.volume.diff_price_volume_fit import DiffPriceVolumeFit +from factorlab.factors.volume.delta_price_volume_fit import DeltaPriceVolumeFit +from factorlab.factors.volume.on_balance_volume import OnBalanceVolume +from factorlab.factors.volume.delta_on_balance_volume import DeltaOnBalanceVolume +from factorlab.factors.volume.positive_volume_indicator import PositiveVolumeIndicator +from factorlab.factors.volume.delta_positive_volume_indicator import DeltaPositiveVolumeIndicator +from factorlab.factors.volume.negative_volume_indicator import NegativeVolumeIndicator +from factorlab.factors.volume.delta_negative_volume_indicator import DeltaNegativeVolumeIndicator +from factorlab.factors.volume.product_price_volume import ProductPriceVolume +from factorlab.factors.volume.sum_price_volume import SumPriceVolume +from factorlab.factors.volume.delta_product_price_volume import DeltaProductPriceVolume +from factorlab.factors.volume.delta_sum_price_volume import DeltaSumPriceVolume +from factorlab.utils import to_dataframe + + +class Volume(Factor): + """Factory class for volume factors.""" + + _METHOD_MAP: ClassVar[Dict[str, Type[BaseTransform]]] = { + "volume_momentum": VolumeMomentum, + "delta_volume_momentum": DeltaVolumeMomentum, + "volume_weighted_ma_over_ma": VolumeWeightedMAOverMA, + "diff_volume_weighted_ma_over_ma": DiffVolumeWeightedMAOverMA, + "price_volume_fit": PriceVolumeFit, + "diff_price_volume_fit": DiffPriceVolumeFit, + "delta_price_volume_fit": DeltaPriceVolumeFit, + "on_balance_volume": OnBalanceVolume, + "delta_on_balance_volume": DeltaOnBalanceVolume, + "positive_volume_indicator": PositiveVolumeIndicator, + "delta_positive_volume_indicator": DeltaPositiveVolumeIndicator, + "negative_volume_indicator": NegativeVolumeIndicator, + "delta_negative_volume_indicator": DeltaNegativeVolumeIndicator, + "product_price_volume": ProductPriceVolume, + "sum_price_volume": SumPriceVolume, + "delta_product_price_volume": DeltaProductPriceVolume, + "delta_sum_price_volume": DeltaSumPriceVolume, + } + + _ALIASES: ClassVar[Dict[str, str]] = { + "vmom": "volume_momentum", + "dvmom": "delta_volume_momentum", + "vwmama": "volume_weighted_ma_over_ma", + "dvwmama": "diff_volume_weighted_ma_over_ma", + "pvf": "price_volume_fit", + "difpvf": "diff_price_volume_fit", + "dpvf": "delta_price_volume_fit", + "obv": "on_balance_volume", + "dobv": "delta_on_balance_volume", + "pvi": "positive_volume_indicator", + "dpvi": "delta_positive_volume_indicator", + "nvi": "negative_volume_indicator", + "dnvi": "delta_negative_volume_indicator", + "ppv": "product_price_volume", + "spv": "sum_price_volume", + "dppv": "delta_product_price_volume", + "dspv": "delta_sum_price_volume", + } + + @classmethod + def get_factor_metadata(cls) -> pd.DataFrame: + data = [] + for alias, factor_class in cls._METHOD_MAP.items(): + try: + factor_instance = factor_class() + data.append( + { + "Alias": alias, + "Class": factor_class.__name__, + "Description": factor_instance.description, + } + ) + except Exception as exc: + data.append( + { + "Alias": alias, + "Class": factor_class.__name__, + "Description": f"Instantiation Failed: {exc}", + } + ) + + return pd.DataFrame(data).set_index("Alias") + + def __init__(self, method: str = "volume_momentum", **kwargs): + super().__init__( + name="Volume", + description="A factory for volume-based factors.", + category="Volume", + ) + + method = method.lower().strip() + self.method = self._ALIASES.get(method, method) + self.kwargs = kwargs + + if self.method not in self._METHOD_MAP: + raise ValueError( + f"Invalid volume factor method '{self.method}'. " + f"Method must be one of: {list(self._METHOD_MAP.keys())}" + ) + + factor_class = self._METHOD_MAP[self.method] + self._factor: Factor = factor_class(**self.kwargs) + + @property + def inputs(self) -> list[str]: + return self._factor.inputs + + def fit( + self, + X: Union[pd.Series, pd.DataFrame], + y: Optional[Union[pd.Series, pd.DataFrame]] = None, + ) -> "Volume": + df_input = to_dataframe(X) + self.validate_inputs(df_input) + self._factor.fit(df_input) + self._is_fitted = True + return self + + def transform(self, data: Union[pd.Series, pd.DataFrame]) -> pd.DataFrame: + if not self._is_fitted: + raise RuntimeError("Volume transform must be fitted before calling transform().") + + return self._factor.transform(data) diff --git a/src/factorlab/factors/volume/volume_momentum.py b/src/factorlab/factors/volume/volume_momentum.py new file mode 100644 index 0000000..b0a362e --- /dev/null +++ b/src/factorlab/factors/volume/volume_momentum.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class VolumeMomentum(VolumeFactor): + def __init__(self, hist_length: int = 20, multiplier: int = 4, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.multiplier = multiplier + self.name = "VolumeMomentum" + self.description = "Short-vs-long volume momentum ratio." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}_{self.multiplier}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + volume = df[self.volume_col] + short_ma = self._rolling_mean(volume, window=self.hist_length) + long_ma = self._rolling_mean(volume, window=self.hist_length * self.multiplier) + ratio = short_ma / long_ma.replace(0, np.nan) + return self._safe_log(ratio) diff --git a/src/factorlab/factors/volume/volume_weighted_ma_over_ma.py b/src/factorlab/factors/volume/volume_weighted_ma_over_ma.py new file mode 100644 index 0000000..ee080a8 --- /dev/null +++ b/src/factorlab/factors/volume/volume_weighted_ma_over_ma.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from factorlab.factors.volume.base import VolumeFactor + + +class VolumeWeightedMAOverMA(VolumeFactor): + def __init__(self, hist_length: int = 50, **kwargs): + super().__init__(**kwargs) + self.hist_length = hist_length + self.name = "VolumeWeightedMAOverMA" + self.description = "Log ratio of VWMA over MA." + + def _generate_name(self) -> str: + return self.output_col or f"{self.name}_{self.hist_length}" + + def _compute_volume(self, df: pd.DataFrame) -> pd.Series: + close = df[self.price_col] + volume = df[self.volume_col] + + pv = close * volume + vwma = self._rolling_stat(pv, window=self.hist_length, stat="sum") / self._rolling_stat( + volume, window=self.hist_length, stat="sum" + ).replace(0, np.nan) + ma = self._rolling_mean(close, window=self.hist_length) + + return self._safe_log(vwma / ma.replace(0, np.nan)) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2bbf859 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,10 @@ +import sys +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" + +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + diff --git a/tests/features/test_volume_factors.py b/tests/features/test_volume_factors.py new file mode 100644 index 0000000..3817558 --- /dev/null +++ b/tests/features/test_volume_factors.py @@ -0,0 +1,127 @@ +import numpy as np +import pandas as pd +import pytest +from pathlib import Path + +from factorlab.factors.volume import Volume + + +FACTOR_SPECS = [ + ("volume_momentum", {"hist_length": 20, "multiplier": 4}), + ("delta_volume_momentum", {"hist_length": 20, "multiplier": 4, "delta_len": 100}), + ("volume_weighted_ma_over_ma", {"hist_length": 50}), + ("diff_volume_weighted_ma_over_ma", {"short_dist": 20, "long_dist": 100}), + ("price_volume_fit", {"hist_length": 50}), + ("diff_price_volume_fit", {"short_dist": 20, "long_dist": 100}), + ("delta_price_volume_fit", {"hist_length": 20, "delta_dist": 30}), + ("on_balance_volume", {"hist_length": 50}), + ("delta_on_balance_volume", {"hist_length": 50, "delta_dist": 45}), + ("positive_volume_indicator", {"hist_length": 40}), + ("delta_positive_volume_indicator", {"hist_length": 40, "delta_dist": 35}), + ("negative_volume_indicator", {"hist_length": 40}), + ("delta_negative_volume_indicator", {"hist_length": 40, "delta_dist": 35}), + ("product_price_volume", {"hist_length": 25}), + ("sum_price_volume", {"hist_length": 25}), + ("delta_product_price_volume", {"hist_length": 40, "delta_dist": 35}), + ("delta_sum_price_volume", {"hist_length": 40, "delta_dist": 35}), +] + + +@pytest.fixture(scope="module") +def crypto_universe() -> pd.DataFrame: + data_path = Path(__file__).resolve().parents[1] / "datasets" / "data" / "binance_spot_prices.csv" + df = pd.read_csv( + data_path, + index_col=["date", "ticker"], + parse_dates=["date"], + ) + df = df.sort_index() + + # keep symbols with at least 300 daily bars + counts = df.groupby(level=1).size() + keep = counts[counts >= 300].index + df = df[df.index.get_level_values(1).isin(keep)] + + # keep a liquid subset to keep tests fast and stable + avg_notional = (df["close"] * df["volume"]).groupby(level=1).mean() + top_symbols = avg_notional.nlargest(60).index + df = df[df.index.get_level_values(1).isin(top_symbols)] + + return df[["open", "high", "low", "close", "volume"]] + + +@pytest.mark.parametrize("method,kwargs", FACTOR_SPECS) +def test_volume_factor_methods_smoke(crypto_universe: pd.DataFrame, method: str, kwargs: dict) -> None: + factor = Volume(method=method, **kwargs) + out = factor.compute(crypto_universe) + + created_cols = [col for col in out.columns if col not in crypto_universe.columns] + assert len(created_cols) == 1 + + factor_col = created_cols[0] + values = out[factor_col].dropna() + + assert len(values) > 0 + assert (values <= 50).all() + assert (values >= -50).all() + + pd.testing.assert_frame_equal(out[crypto_universe.columns], crypto_universe) + assert out.index.equals(crypto_universe.index) + + +def test_volume_factor_crypto_rank_ic_smoke(crypto_universe: pd.DataFrame) -> None: + close = crypto_universe["close"] + volume = crypto_universe["volume"] + + fwd_ret = close.groupby(level=1).shift(-1).div(close) - 1.0 + + # daily tradable universe proxy: top 40 by 20-day average notional + notional = close * volume + liquidity = ( + notional.groupby(level=1) + .rolling(window=20, min_periods=20) + .mean() + .droplevel(0) + .sort_index() + ) + eligible = liquidity.groupby(level=0).rank(ascending=False, method="first") <= 40 + + rows = [] + for method, kwargs in FACTOR_SPECS: + factor = Volume(method=method, **kwargs) + out = factor.compute(crypto_universe) + factor_col = [col for col in out.columns if col not in crypto_universe.columns][0] + + panel = pd.concat( + [ + out[factor_col].rename("factor"), + fwd_ret.rename("fwd_ret"), + eligible.rename("eligible"), + ], + axis=1, + ) + panel = panel[panel["eligible"]].dropna() + + daily_ic = panel.groupby(level=0).apply( + lambda g: g["factor"].corr(g["fwd_ret"], method="spearman") if g.shape[0] >= 12 else np.nan + ) + + n_obs = int(daily_ic.notna().sum()) + mean_ic = float(daily_ic.mean()) if n_obs > 0 else np.nan + std_ic = float(daily_ic.std()) if n_obs > 1 else np.nan + + rows.append( + { + "method": method, + "n_obs": n_obs, + "mean_ic": mean_ic, + "std_ic": std_ic, + } + ) + + summary = pd.DataFrame(rows).set_index("method") + + assert summary.shape[0] == len(FACTOR_SPECS) + assert (summary["n_obs"] >= 30).sum() >= 12 + assert np.isfinite(summary["mean_ic"].dropna()).all() + assert (summary["mean_ic"].dropna().abs() <= 1).all()