"""The DR-learner model from Kennedy 2004."""
import numpy as np
from sklearn.linear_model import LinearRegression
from .base import UpliftMetaModelBase
from ..base import UpliftRegressorMixin, UpliftClassifierMixin
from ..model_selection import uplift_check_cv
[docs]
class DRLearnerBase(UpliftMetaModelBase):
def __init__(self, base_estimator=LinearRegression(),
mean_estimator=None, cv=2):
super().__init__(base_estimator)
self.mean_estimator = mean_estimator
self.cv = cv
def _make_cv(self, y, trt, y_stratify):
if y_stratify is None:
cv_classifier=False
else:
cv_classifier=True
self.cv_, y_stratify = uplift_check_cv(self.cv, y_stratify,
trt, self.n_trt_,
classifier=cv_classifier)
self.y_stratify_ = y_stratify # more elegant way to pass to _iter_training_subsets?
def _get_model_names_list(self, X, y, trt, *, y_stratify=None):
if self.n_trt_ > 1:
raise ValueError("DRLearner is only supported for a single treatment.")
if self.mean_estimator is None:
self.mean_estimator_ = self.base_estimator
else:
self.mean_estimator_ = self.mean_estimator
self._make_cv(y, trt, y_stratify)
self.n_splits_ = self.cv_.get_n_splits(X, self.y_stratify_)
if self.n_splits_ > 1:
m_names = []
for i in range(self.n_splits_):
m_names += [f"mean_model_c_fold_{i}",
f"mean_model_t_fold_{i}",
f"tau_model_fold_{i}"]
else:
m_names = ["mean_model_c", "mean_model_t", "tau_model"]
if self.mean_estimator is not None:
m_names_w_models = []
for i, m_name in enumerate(m_names):
if i % 3 == 0 or i % 3 == 1:
m_names_w_models.append((m_name, self.mean_estimator))
else:
m_names_w_models.append((m_name, self.base_estimator))
m_names = m_names_w_models
return m_names
def _iter_training_subsets(self, X, y, trt, n_trt, sample_weight):
for i, (mean_idx, tau_idx) in enumerate(self.cv_.split(X, self.y_stratify_)):
X_mean = X[mean_idx]
y_mean = y[mean_idx]
mean_trt = trt[mean_idx]
mask_c = (mean_trt==0)
mask_t = ~mask_c
if sample_weight is not None:
w_mean = sample_weight[mean_idx]
yield X_mean[mask_c], y_mean[mask_c], w_mean[mask_c]
yield X_mean[mask_t], y_mean[mask_t], w_mean[mask_t]
else:
yield X_mean[mask_c], y_mean[mask_c], None
yield X_mean[mask_t], y_mean[mask_t], None
X_tau = X[tau_idx]
y_tau = y[tau_idx]
# allow use for classification problems
y_tau = np.array(y_tau, dtype=float)
if np.may_share_memory(y_tau, y):
y_tau = y_tau.copy()
tau_trt = trt[tau_idx]
if sample_weight is not None:
w_tau = sample_weight[tau_idx]
else:
w_tau = None
mask_c = (tau_trt==0)
mask_t = ~mask_c
y0_hat = self.models_[i*3][1].predict(X_tau)
y1_hat = self.models_[i*3+1][1].predict(X_tau)
# create pseudooutcomes
# TODO: share code with target transform
if w_tau is None:
nt = mask_t.sum()
nc = mask_c.sum()
else:
nt = w_tau[mask_t].sum()
nc = w_tau[mask_c].sum()
n = nt + nc
y_tau[mask_c] -= y0_hat[mask_c]
y_tau[mask_t] -= y1_hat[mask_t]
y_tau[mask_c] *= (-n/nc)
y_tau[mask_t] *= (n/nt)
tau_hat = y1_hat - y0_hat
y_tau += tau_hat
yield X_tau, y_tau, w_tau
[docs]
def predict(self, X):
for i in range(self.n_splits_):
if i == 0:
pred = self.models_[3*i+2][1].predict(X)
else:
pred += self.models_[3*i+2][1].predict(X)
return pred / self.n_splits_
[docs]
def fit(self, X, y, trt, n_trt=None, sample_weight=None, *, y_stratify=None):
super().fit(X, y, trt, n_trt, sample_weight=sample_weight,
y_stratify=y_stratify)
[docs]
class DRLearnerUpliftRegressor(UpliftRegressorMixin, DRLearnerBase):
pass
[docs]
class DRLearnerUpliftClassifier(UpliftClassifierMixin, DRLearnerBase):
"""The classifier works by treating the class variable as 0/1 real
target and using a DRLearnerUpliftRegressor.
The main difference is that stratification takes the target
variable into account.
"""
[docs]
def fit(self, X, y, trt, n_trt=None, sample_weight=None, *, y_stratify=None):
super().fit(X, y, trt, n_trt, sample_weight=sample_weight,
y_stratify=y_stratify)
def _make_cv(self, y, trt, y_stratify):
if y_stratify is None:
y_stratify = y
self.cv_, y_stratify = uplift_check_cv(self.cv, y_stratify,
trt, self.n_trt_,
classifier=True)
self.y_stratify_ = y_stratify # more elegant way to pass to _iter_training_subsets?
[docs]
def predict(self, X):
y_pred = super().predict(X)
y = np.column_stack([-y_pred, y_pred])
return y