Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/factorlab/factors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from factorlab.factors.base import Factor

__all__ = ["Factor"]

41 changes: 41 additions & 0 deletions src/factorlab/factors/volume/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from factorlab.factors.volume.base import VolumeFactor
from factorlab.factors.volume.volume import Volume
from factorlab.factors.volume.volume_momentum import VolumeMomentum
from factorlab.factors.volume.delta_volume_momentum import DeltaVolumeMomentum
from factorlab.factors.volume.volume_weighted_ma_over_ma import VolumeWeightedMAOverMA
from factorlab.factors.volume.diff_volume_weighted_ma_over_ma import DiffVolumeWeightedMAOverMA
from factorlab.factors.volume.price_volume_fit import PriceVolumeFit
from factorlab.factors.volume.diff_price_volume_fit import DiffPriceVolumeFit
from factorlab.factors.volume.delta_price_volume_fit import DeltaPriceVolumeFit
from factorlab.factors.volume.on_balance_volume import OnBalanceVolume
from factorlab.factors.volume.delta_on_balance_volume import DeltaOnBalanceVolume
from factorlab.factors.volume.positive_volume_indicator import PositiveVolumeIndicator
from factorlab.factors.volume.delta_positive_volume_indicator import DeltaPositiveVolumeIndicator
from factorlab.factors.volume.negative_volume_indicator import NegativeVolumeIndicator
from factorlab.factors.volume.delta_negative_volume_indicator import DeltaNegativeVolumeIndicator
from factorlab.factors.volume.product_price_volume import ProductPriceVolume
from factorlab.factors.volume.sum_price_volume import SumPriceVolume
from factorlab.factors.volume.delta_product_price_volume import DeltaProductPriceVolume
from factorlab.factors.volume.delta_sum_price_volume import DeltaSumPriceVolume

__all__ = [
"VolumeFactor",
"Volume",
"VolumeMomentum",
"DeltaVolumeMomentum",
"VolumeWeightedMAOverMA",
"DiffVolumeWeightedMAOverMA",
"PriceVolumeFit",
"DiffPriceVolumeFit",
"DeltaPriceVolumeFit",
"OnBalanceVolume",
"DeltaOnBalanceVolume",
"PositiveVolumeIndicator",
"DeltaPositiveVolumeIndicator",
"NegativeVolumeIndicator",
"DeltaNegativeVolumeIndicator",
"ProductPriceVolume",
"SumPriceVolume",
"DeltaProductPriceVolume",
"DeltaSumPriceVolume",
]
187 changes: 187 additions & 0 deletions src/factorlab/factors/volume/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, List, Optional, Union

import numpy as np
import pandas as pd

from factorlab.factors.base import Factor
from factorlab.features.transforms.dispersion import Dispersion
from factorlab.features.transforms.returns import Difference, Returns
from factorlab.features.transforms.smoothing import WindowSmoother
from factorlab.utils import to_dataframe


class VolumeFactor(Factor, ABC):
"""Base class for volume/price interaction factors."""

def __init__(
self,
price_col: str = "close",
volume_col: str = "volume",
output_col: Optional[str] = None,
compress: bool = True,
compression_window: int = 250,
compression_min_periods: int = 30,
compression_strength: float = 1.0,
**kwargs: Any,
):
super().__init__(
name=self.__class__.__name__,
description="Base class for volume factors.",
category="Volume",
tags=["volume", "flow", "microstructure"],
)
self.price_col = price_col
self.volume_col = volume_col
self.output_col = output_col
self.compress = compress
self.compression_window = compression_window
self.compression_min_periods = compression_min_periods
self.compression_strength = compression_strength
self.kwargs = kwargs

@property
def inputs(self) -> List[str]:
return [self.price_col, self.volume_col]

def fit(
self,
X: Union[pd.Series, pd.DataFrame],
y: Optional[Union[pd.Series, pd.DataFrame]] = None,
) -> "VolumeFactor":
df_input = to_dataframe(X)
self.validate_inputs(df_input)
self._is_fitted = True
return self

def transform(self, X: Union[pd.Series, pd.DataFrame]) -> pd.DataFrame:
if not self._is_fitted:
raise RuntimeError(f"Transform '{self.name}' must be fitted before calling transform()")

df = to_dataframe(X).copy(deep=True)
self.validate_inputs(df)
df = df.sort_index()

factor = self._compute_volume(df)
if self.compress:
factor = self._compress(factor)

df[self._generate_name()] = factor.clip(-50, 50)
return df

@abstractmethod
def _compute_volume(self, df: pd.DataFrame) -> pd.Series:
raise NotImplementedError

def _generate_name(self) -> str:
return self.output_col or self.name

def _is_multiindex(self, series: pd.Series) -> bool:
return isinstance(series.index, pd.MultiIndex)

def _safe_log(self, series: pd.Series) -> pd.Series:
return np.log(series.where(series > 0, np.nan))

def _series_frame(self, series: pd.Series, col: str = "value") -> pd.DataFrame:
return series.astype("float64").to_frame(col)

def _shift_by_asset(self, series: pd.Series, periods: int) -> pd.Series:
if self._is_multiindex(series):
return series.groupby(level=1).shift(periods)
return series.shift(periods)

def _pct_change_by_asset(self, series: pd.Series, periods: int = 1) -> pd.Series:
df = self._series_frame(series)
ret = Returns(method="pct", input_col="value", output_col="ret", lags=periods).compute(df)
return ret["ret"]

def _diff_by_asset(self, series: pd.Series, periods: int = 1) -> pd.Series:
df = self._series_frame(series)
diff = Difference(input_col="value", output_col="diff", lags=periods).compute(df)
return diff["diff"]

def _rolling_mean(
self,
series: pd.Series,
window: int,
min_periods: Optional[int] = None,
) -> pd.Series:
min_periods = window if min_periods is None else min_periods
df = self._series_frame(series)
smoothed = WindowSmoother(
input_cols="value",
output_cols="mean",
window_type="rolling",
window_size=window,
central_tendency="mean",
min_periods=min_periods,
).compute(df)
return smoothed["mean"]

def _rolling_median(
self,
series: pd.Series,
window: int,
min_periods: Optional[int] = None,
) -> pd.Series:
min_periods = window if min_periods is None else min_periods
df = self._series_frame(series)
smoothed = WindowSmoother(
input_cols="value",
output_cols="median",
window_type="rolling",
window_size=window,
central_tendency="median",
min_periods=min_periods,
).compute(df)
return smoothed["median"]

def _rolling_std(
self,
series: pd.Series,
window: int,
min_periods: Optional[int] = None,
) -> pd.Series:
min_periods = 2 if min_periods is None else min_periods
df = self._series_frame(series)
dispersion = Dispersion(
method="std",
input_col="value",
output_col="std",
axis="ts",
window_type="rolling",
window_size=window,
min_periods=min_periods,
).compute(df)
return dispersion["std"]

def _rolling_stat(
self,
series: pd.Series,
window: int,
stat: str,
min_periods: Optional[int] = None,
**kwargs: Any,
) -> pd.Series:
min_periods = window if min_periods is None else min_periods

if self._is_multiindex(series):
rolled = getattr(
series.groupby(level=1).rolling(window=window, min_periods=min_periods),
stat,
)(**kwargs)
return rolled.droplevel(0).sort_index()

return getattr(series.rolling(window=window, min_periods=min_periods), stat)(**kwargs)

def _compress(self, raw: pd.Series) -> pd.Series:
robust_scale = self._rolling_median(
raw.abs(),
window=self.compression_window,
min_periods=self.compression_min_periods,
).replace(0, np.nan)

normalized = raw / robust_scale
return 50.0 * np.tanh(self.compression_strength * normalized)
37 changes: 37 additions & 0 deletions src/factorlab/factors/volume/delta_negative_volume_indicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

import numpy as np
import pandas as pd

from factorlab.factors.volume.base import VolumeFactor


class DeltaNegativeVolumeIndicator(VolumeFactor):
def __init__(self, hist_length: int = 40, delta_dist: int = 35, **kwargs):
super().__init__(**kwargs)
self.hist_length = hist_length
self.delta_dist = delta_dist
self.name = "DeltaNegativeVolumeIndicator"
self.description = "Current minus lagged negative-volume indicator."

def _generate_name(self) -> str:
return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}"

def _compute_volume(self, df: pd.DataFrame) -> pd.Series:
close = df[self.price_col]
volume = df[self.volume_col]

rel_change = self._pct_change_by_asset(close, periods=1)
prev_volume = self._shift_by_asset(volume, 1)
filtered = rel_change.where(volume < prev_volume, 0.0)

avg_change = self._rolling_mean(filtered, window=self.hist_length)
norm_window = max(2 * self.hist_length, 250)
std_change = self._rolling_std(
rel_change,
window=norm_window,
min_periods=self.hist_length,
).replace(0, np.nan)
nvi = avg_change / std_change

return nvi - self._shift_by_asset(nvi, self.delta_dist)
31 changes: 31 additions & 0 deletions src/factorlab/factors/volume/delta_on_balance_volume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import numpy as np
import pandas as pd

from factorlab.factors.volume.base import VolumeFactor


class DeltaOnBalanceVolume(VolumeFactor):
def __init__(self, hist_length: int = 50, delta_dist: int = 45, **kwargs):
super().__init__(**kwargs)
self.hist_length = hist_length
self.delta_dist = delta_dist
self.name = "DeltaOnBalanceVolume"
self.description = "Current minus lagged on-balance-volume signal."

def _generate_name(self) -> str:
return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}"

def _compute_volume(self, df: pd.DataFrame) -> pd.Series:
close = df[self.price_col]
volume = df[self.volume_col]

close_diff = self._diff_by_asset(close, 1)
signed_volume = volume * np.sign(close_diff)

signed_sum = self._rolling_stat(signed_volume, window=self.hist_length, stat="sum")
total_sum = self._rolling_stat(volume, window=self.hist_length, stat="sum")
obv = signed_sum / total_sum.replace(0, np.nan)

return obv - self._shift_by_asset(obv, self.delta_dist)
37 changes: 37 additions & 0 deletions src/factorlab/factors/volume/delta_positive_volume_indicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

import numpy as np
import pandas as pd

from factorlab.factors.volume.base import VolumeFactor


class DeltaPositiveVolumeIndicator(VolumeFactor):
def __init__(self, hist_length: int = 40, delta_dist: int = 35, **kwargs):
super().__init__(**kwargs)
self.hist_length = hist_length
self.delta_dist = delta_dist
self.name = "DeltaPositiveVolumeIndicator"
self.description = "Current minus lagged positive-volume indicator."

def _generate_name(self) -> str:
return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}"

def _compute_volume(self, df: pd.DataFrame) -> pd.Series:
close = df[self.price_col]
volume = df[self.volume_col]

rel_change = self._pct_change_by_asset(close, periods=1)
prev_volume = self._shift_by_asset(volume, 1)
filtered = rel_change.where(volume > prev_volume, 0.0)

avg_change = self._rolling_mean(filtered, window=self.hist_length)
norm_window = max(2 * self.hist_length, 250)
std_change = self._rolling_std(
rel_change,
window=norm_window,
min_periods=self.hist_length,
).replace(0, np.nan)
pvi = avg_change / std_change

return pvi - self._shift_by_asset(pvi, self.delta_dist)
36 changes: 36 additions & 0 deletions src/factorlab/factors/volume/delta_price_volume_fit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import numpy as np
import pandas as pd

from factorlab.factors.volume.base import VolumeFactor


class DeltaPriceVolumeFit(VolumeFactor):
def __init__(self, hist_length: int = 20, delta_dist: int = 30, **kwargs):
super().__init__(**kwargs)
self.hist_length = hist_length
self.delta_dist = delta_dist
self.name = "DeltaPriceVolumeFit"
self.description = "Current minus lagged price-volume fit slope."

def _generate_name(self) -> str:
return self.output_col or f"{self.name}_{self.hist_length}_{self.delta_dist}"

def _compute_volume(self, df: pd.DataFrame) -> pd.Series:
close = df[self.price_col]
volume = df[self.volume_col]

x = self._safe_log(volume)
y = self._safe_log(close)

mean_x = self._rolling_mean(x, window=self.hist_length)
mean_y = self._rolling_mean(y, window=self.hist_length)
mean_xy = self._rolling_mean(x * y, window=self.hist_length)
mean_x2 = self._rolling_mean(x * x, window=self.hist_length)

cov_xy = mean_xy - (mean_x * mean_y)
var_x = mean_x2 - (mean_x * mean_x)
pvf = cov_xy / var_x.replace(0, np.nan)

return pvf - self._shift_by_asset(pvf, self.delta_dist)
Loading