Skip to content
1,671 changes: 1,671 additions & 0 deletions notebooks/nn_response_evaluation.ipynb

Large diffs are not rendered by default.

1,121 changes: 571 additions & 550 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ repository = "https://github.com/sb-ai-lab/Sim4Rec"
python = ">=3.8, <3.10"
pyarrow = "*"
sdv = "0.15.0"
torch = "*"
Comment thread
monkey0head marked this conversation as resolved.
torch = ">=1.9.1"
torchmetrics="*"
pandas = "*"
pyspark = ">=3.0"
numpy = ">=1.20.0"
Expand Down
9 changes: 7 additions & 2 deletions sim4rec/response/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
NoiseResponse,
CosineSimilatiry,
BernoulliResponse,
ParametricResponseFunction
ParametricResponseFunction,
)

from .nn_response import NNResponseTransformer, NNResponseEstimator


__all__ = [
'ActionModelEstimator',
'ActionModelTransformer',
'ConstantResponse',
'NoiseResponse',
'CosineSimilatiry',
'BernoulliResponse',
'ParametricResponseFunction'
'ParametricResponseFunction',
'NNResponseTransformer',
'NNResponseEstimator',
]
204 changes: 204 additions & 0 deletions sim4rec/response/nn_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import os
import pickle
import pyspark.sql.functions as sf

from .response import ActionModelEstimator, ActionModelTransformer
from .nn_utils.models import ResponseModel
from .nn_utils.embeddings import IndexEmbedding
from .nn_utils.datasets import RecommendationData

from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
DoubleType,
)

# Dataframe schema for simulator logs.
SIM_LOG_SCHEMA = StructType(
[
StructField("user_idx", IntegerType(), True),
StructField("item_idx", IntegerType(), True),
StructField("relevance", DoubleType(), True),
StructField("response_proba", DoubleType(), True),
StructField("response", IntegerType(), True),
StructField("__iter", IntegerType(), True),
]
)
SIM_LOG_COLS = [field.name for field in SIM_LOG_SCHEMA.fields]


class NNResponseTransformer(ActionModelTransformer):
def __init__(self, **kwargs):
super().__init__()
self.hist_data = None
for param, value in kwargs.items():
setattr(self, param, value)

@classmethod
def load(cls, checkpoint_dir):
"""Load model saved with `NNResponseTransformer.save` method."""
with open(os.path.join(checkpoint_dir, "_params.pkl"), "rb") as f:
params_dict = pickle.load(f)
params_dict["backbone_response_model"] = ResponseModel.load(checkpoint_dir)
with open(os.path.join(checkpoint_dir, "_item_indexer.pkl"), "rb") as f:
params_dict["item_indexer"] = pickle.load(f)
with open(os.path.join(checkpoint_dir, "_user_indexer.pkl"), "rb") as f:
params_dict["user_indexer"] = pickle.load(f)
return cls(**params_dict)

def save(self, path):
"""Save response model at given path."""
os.makedirs(path)
self.backbone_response_model.dump(path)
with open(os.path.join(path, "_item_indexer.pkl"), "wb") as f:
pickle.dump(self.item_indexer, f, pickle.HIGHEST_PROTOCOL)
with open(os.path.join(path, "_user_indexer.pkl"), "wb") as f:
pickle.dump(self.user_indexer, f, pickle.HIGHEST_PROTOCOL)
with open(os.path.join(path, "_params.pkl"), "wb") as f:
pickle.dump(
{
"outputCol": self.outputCol,
"log_dir": self.log_dir,
"hist_data_dir": self.hist_data_dir,
},
f,
pickle.HIGHEST_PROTOCOL,
)

def _transform(self, new_recs):
"""
Predict responses for given dataframe with recommendations.
Response function gets dataframe with columns <user_idx, item_idx, relevance, __iter>
and returns dataframe with columns <user_idx, item_idx, relevance, __iter, response_proba>.
If the initial dataframe had some other columns, they will be returned as well.

To sample clicks from this raw probabilities, please use `.response.BernoulliResponse`

:param new_recs: new recommendations.
:returns: same dataframe, but with predicted click probabilities.
"""

def predict_udf(df):
# This import is required for correct serialization on worker's side.
from .nn_utils.datasets import PandasRecommendationData

dataset = PandasRecommendationData(
log=df,
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
)
dataset = self.backbone_response_model.transform(dataset=dataset)

return dataset._log[SIM_LOG_COLS]

spark = new_recs.sql_ctx.sparkSession

# read the historical data
hist_data = spark.read.schema(SIM_LOG_SCHEMA).parquet(self.hist_data_dir)
if not hist_data:
print("Warning: the historical data is empty")
hist_data = spark.createDataFrame([], schema=SIM_LOG_SCHEMA)

# filter users whom we don't need
hist_data = hist_data.join(new_recs, on="user_idx", how="semi")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really want to leave the history of only distinct users from new_recs in hist_data.

Suggested change
hist_data = hist_data.join(new_recs, on="user_idx", how="semi")
hist_data = hist_data.join(sf.broadcast(new_recs.select("user_idx").distinct()), on="user_idx", how="inner")


# read the updated simulator log
simlog = spark.read.schema(SIM_LOG_SCHEMA).parquet(self.log_dir)
if not simlog:
print("Warning: the simulator log is empty")
simlog = spark.createDataFrame([], schema=SIM_LOG_SCHEMA)

# filter users whom we don't need
simlog = simlog.join(new_recs, on="user_idx", how="semi")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as for hist data...

Suggested change
simlog = simlog.join(new_recs, on="user_idx", how="semi")
simlog = simlog.join(sf.broadcast(new_recs.select("user_idx").distinct()), on="user_idx", how="inner")


# since all the historical records are older than simulated by design,
# and new slates are newer than simulated, i can simply concat it
NEW_ITER_NO = 9999999 # this is just a large number
combined_data = hist_data.unionByName(simlog).unionByName(
new_recs.withColumn("response_proba", sf.lit(0.0))
.withColumn("response", sf.lit(0.0))
.withColumn(
"__iter",
sf.lit(NEW_ITER_NO),
)
)

# the dataframe is assumed to be already partitioned by user_idx,
# here we actually just compute response probabilities for
# one user by one worker
groupping_column = "user_idx"
result_df = combined_data.groupby(groupping_column).applyInPandas(
predict_udf, SIM_LOG_SCHEMA
)
filtered_df = result_df.filter(sf.col("__iter") == NEW_ITER_NO)
return filtered_df.select(new_recs.columns + [self.outputCol])


class NNResponseEstimator(ActionModelEstimator):
def __init__(
self,
log_dir: str,
model_name: str,
hist_data_dir=None,
val_data_dir=None,
outputCol: str = "response_proba",
**kwargs,
):
"""
:param log_dir: The directory containing simulation logs.
:param model_name: Backbone model name.
:param hist_data_dir: (Optional) Spark DataFrame with historical data.
:param val_data_dir: (Optional) Spark DataFrame with validation data.
TODO: split automatically.
:param outputCol: Output column for MLLib pipeline.

"""
self.fit_params = kwargs
self.outputCol = outputCol

# sim log is not loaded immideately, because
# it can be not created when the response model is initialized
self.log_dir = log_dir
self.hist_data_dir = hist_data_dir
self.val_data_dir = val_data_dir

# create new model
self.item_indexer = self.user_indexer = None
self.model_name = model_name
self.backbone_response_model = None

def _fit(self, train_data):
"""
Copy link
Copy Markdown
Collaborator

@monkey0head monkey0head Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls describe the dataframe format here and for transform. what should be included to properly convert dataframe to the RecommendationData. pls add corresponding docstrings

Copy link
Copy Markdown
Author

@arabel1a arabel1a Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is exactly the same as the simulator logs format. Please give me advice, where I can obtain it's description.

Fits the model on given data.

:param DataFrame train_data: Data to train on, this data must match
the simulator log schema exactly.
"""
train_dataset = RecommendationData(
log=train_data,
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
)
self.item_indexer = train_dataset._item_indexer
self.user_indexer = train_dataset._user_indexer
val_dataset = RecommendationData(
log=train_data.sql_ctx.sparkSession.read.parquet(self.val_data_dir),
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
)
n_items = train_dataset.n_items
backbone_response_model = ResponseModel(
self.model_name, IndexEmbedding(n_items)
)
backbone_response_model.fit(
train_dataset, val_data=val_dataset, **self.fit_params
)
return NNResponseTransformer(
backbone_response_model=backbone_response_model,
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
hist_data_dir=self.hist_data_dir,
log_dir=self.log_dir,
outputCol=self.outputCol,
)
1 change: 1 addition & 0 deletions sim4rec/response/nn_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# __init__
Loading