Source code for libuplift.metrics.curves

"""Uplift and Qini curves."""

import numpy as np

from sklearn.utils.validation import check_array, check_consistent_length

from ..utils import check_trt
from ..utils import area_under_curve

def _cumulative_gains_curve(y_true, y_score, sample_weight):
    desc_score_indices = np.argsort(y_score, kind="mergesort")[::-1]
    y_score = y_score[desc_score_indices]
    y_true = y_true[desc_score_indices]
    if sample_weight is not None:
        weight = sample_weight[desc_score_indices]
    else:
        weight = 1.0

    # handle tied values
    distinct_value_indices = np.where(np.diff(y_score))[0]
    threshold_idxs = np.r_[distinct_value_indices, y_true.size - 1]
    # compute gains and prepend (0,0) point at the beginning
    gains = np.r_[0, np.cumsum(y_true * weight)[threshold_idxs]]
    if sample_weight is not None:
        xs = np.r_[0, np.cumsum(weight)[threshold_idxs]]
        xs = xs / xs[-1]
    else:
        xs = np.r_[0, threshold_idxs+1]
        xs = xs / xs[-1]
    return xs, gains

[docs] def uplift_curve(y_true, y_score, trt, n_trt=None, pos_label=None, sample_weight=None): """Uplift curve. Unless specified explicitly, y_true is assumed to be 0-1, with 1 the positive outcome. This function implements the variant used by Rzepakowski and Jaroszewicz, where treatment and control curves are computed separately and subtracted. """ y_true = check_array(y_true, ensure_2d=False) y_score = check_array(y_score, ensure_2d=False) trt, n_trt = check_trt(trt, n_trt) if sample_weight is None: check_consistent_length(y_true, y_score, trt) sample_weight_c = None sample_weight_t = None n_c = (trt==0).sum() n_t = (trt==1).sum() else: sample_weight = check_array(sample_weight, ensure_2d=False) check_consistent_length(y_true, y_score, trt, sample_weight) sample_weight_c = sample_weight[trt==0] sample_weight_t = sample_weight[trt==1] n_c = sample_weight_c.sum() n_t = sample_weight_t.sum() if n_trt > 1: raise ValueError("uplift curve only supported for a single treatment.") if pos_label is not None: y_true = (y_true == pos_label) y_score_c = y_score[trt==0] y_score_t = y_score[trt==1] y_true_c = y_true[trt==0] y_true_t = y_true[trt==1] x_c, gains_c = _cumulative_gains_curve(y_true_c, y_score_c, sample_weight_c) x_t, gains_t = _cumulative_gains_curve(y_true_t, y_score_t, sample_weight_t) # normalize if n_c == 0: raise RuntimeError("Cannot construct uplift curve: no cases in control") if n_t == 0: raise RuntimeError("Cannot construct uplift curve: no treated cases") gains_c /= n_c gains_t /= n_t # interpolate and subtract curves x = np.union1d(x_c, x_t) y_c = np.interp(x, x_c, gains_c) y_t = np.interp(x, x_t, gains_t) u = y_t - y_c return x, u
[docs] def uplift_curve_j(y_true, y_score, trt, n_trt=None, pos_label=None, sample_weight=None): """Uplift curve. Unless specified explicitly, y_true is assumed to be 0-1, with 1 the positive outcome. This function implements the variant where scores are sorted jointly, see Verbeke, Nyberg, Verhelst. """ y_true = check_array(y_true, ensure_2d=False, copy=True, dtype=float) y_score = check_array(y_score, ensure_2d=False) trt, n_trt = check_trt(trt, n_trt) if sample_weight is None: check_consistent_length(y_true, y_score, trt) sample_weight = np.ones_like(y_true, dtype=float) else: sample_weight = check_array(sample_weight, ensure_2d=False, dtype=float, copy=True) check_consistent_length(y_true, y_score, trt, sample_weight) if n_trt > 1: raise ValueError("uplift curve only supported for a single treatment.") if pos_label is not None: y_true = (y_true == pos_label) # normalize weights n_c = sample_weight[trt==0].sum() n_t = sample_weight[trt==1].sum() if n_c == 0: raise RuntimeError("Cannot construct uplift curve: no cases in control") if n_t == 0: raise RuntimeError("Cannot construct uplift curve: no treated cases") y_true[trt==0] = -y_true[trt==0] sample_weight[trt==0] /= n_c sample_weight[trt==1] /= n_t x, u = _cumulative_gains_curve(y_true, y_score, sample_weight) return x, u
[docs] def area_under_uplift_curve(y_true, y_score, trt, n_trt=None, pos_label=None, sample_weight=None, subtract_diag=True): x, u = uplift_curve(y_true, y_score, trt, n_trt=n_trt, pos_label=pos_label, sample_weight=sample_weight) return area_under_curve(x, u, subtract_diag=subtract_diag)
[docs] def area_under_uplift_curve_j(y_true, y_score, trt, n_trt=None, pos_label=None, sample_weight=None, subtract_diag=True): x, u = uplift_curve_j(y_true, y_score, trt, n_trt=n_trt, pos_label=pos_label, sample_weight=sample_weight) return area_under_curve(x, u, subtract_diag=subtract_diag)