"""Utilities for all datasets.
Essentially copied from sklearn.datasets._base to reduce dependency on
unofficial sklearn API."""
from collections import namedtuple, OrderedDict
import hashlib
from urllib.request import urlretrieve
from os.path import join, exists
from os import remove, makedirs
from io import StringIO
import csv
import gzip
import logging
from inspect import isfunction
import numpy as np
import joblib
from sklearn.utils import Bunch
from sklearn.utils import check_random_state
from sklearn.datasets import get_data_home
import libuplift.datasets._data as _data
def _sha256(path):
"""Calculate the sha256 hash of the file at path."""
sha256hash = hashlib.sha256()
chunk_size = 8192
with open(path, "rb") as f:
while True:
buffer = f.read(chunk_size)
if not buffer:
break
sha256hash.update(buffer)
return sha256hash.hexdigest()
# TODO: replace with sklearn.fetch_remote when it becomes available in stable
def _fetch_remote(remote, dirname=None):
"""Helper function to download a remote dataset into path.
Fetch a dataset pointed by remote's url, save into path using remote's
filename and ensure its integrity based on the SHA256 Checksum of the
downloaded file.
Parameters
----------
remote : RemoteFileMetadata
Named tuple containing remote dataset meta information: url, filename
and checksum
dirname : string
Directory to save the file to.
Returns
-------
file_path: string
Full path of the created file.
"""
file_path = (remote.filename if dirname is None
else join(dirname, remote.filename))
urlretrieve(remote.url, file_path)
checksum = _sha256(file_path)
if remote.checksum != checksum:
raise IOError("{} has an SHA256 checksum ({}) "
"differing from expected ({}), "
"file may be corrupted.".format(file_path, checksum,
remote.checksum))
return file_path
def _read_csv(archive, feature_attrs, treatment_attrs, target_attrs,
total_attrs=None, categ_as_strings=False, header=None,
csv_reader_args={"delimiter":",", "quotechar":'"'},
all_num=False):
"""Read CSV data.
feature_attrs, treatment_attrs, target_attrs contain descriptions
of resp. predictive features, treatment description, and targets.
Currently only a single treatment attribute is supported. Each
description is a list whose elements are tuples describing each
attribute. The first element of the tuple is attribute's name,
the second its type, third element (optional) is the name of
attribute in the CVS header. If the type is a list, it is assumed
to be a list of categories. If the type is a dict, it is assumed
that its keys are categories present in the csv file and the dict
is used to change category names. If the type is a function, it
will receive as argument a list of columns values and should
return a transformed list and final numpy dtype. Otherwise, type
should be a valid numpy dtype.
If total_attrs is not None, it should contain the total number of
attributes in each record.
if all_num is True, array is read as floats for speed. This
should be set only if all columns are either floats or ints.
"""
def parse_attr(Xy_columns, header, attr_description, categ_as_strings):
"""Parse a single attribute.
Return values and dtype. Handle categorical attributes correctly."""
if len(attr_description) == 2:
attr_name, attr_dtype = attr_description
file_attr_name = attr_name
elif len(attr_description) == 3:
attr_name, attr_dtype, file_attr_name = attr_description
attr_no = header.index(file_attr_name)
x = Xy_columns[attr_no]
if isinstance(attr_dtype, (list, dict)):
if categ_as_strings:
if isinstance(attr_dtype, list):
categs = {c:c for c in attr_dtype}
else: # dict
categs = attr_dtype
maxlen = max(len(c) for c in attr_dtype.values())
attr_dtype = f"U{maxlen}"
else:
categs = {c:i for i, c in enumerate(attr_dtype)}
attr_dtype = np.int32
x = [categs[c] for c in x]
elif isfunction(attr_dtype):
x, attr_dtype = attr_dtype(x)
x = np.array(x, dtype=attr_dtype)
return x, attr_name
Xy = []
if isinstance(archive, str):
if archive.endswith(".gz"):
f_context = gzip.open(archive, mode="rt")
else:
f_context = open(archive, mode="rt")
else:
if not hasattr(archive, "read"):
raise ValueError("_read_csv: archive argument "
"must be a string or file-like object.")
f_context = archive
with f_context as csvfile:
if header is None:
delim = csv_reader_args.get("delimiter", ",")
header = next(csvfile).strip().split(delim)
if all_num:
Xy_columns = np.loadtxt(csvfile, unpack=True, **csv_reader_args)
if total_attrs is not None:
assert len(Xy_columns) == total_attrs, (record, total_attrs)
else:
csvreader = csv.reader(csvfile, **csv_reader_args)
for record in csvreader:
Xy.append(record)
if total_attrs is not None:
assert len(record) == total_attrs, (record, total_attrs)
Xy_columns = list(zip(*Xy))
if isinstance(archive, str):
remove(archive)
# parse treatments
treatments = OrderedDict()
n_trts = OrderedDict()
treatment_values = OrderedDict()
for a_descr in treatment_attrs:
ta, attr_name = parse_attr(Xy_columns, header, a_descr, categ_as_strings)
treatments[attr_name] = ta
if isinstance(a_descr[1], list):
t_values = list(a_descr[1])
elif isinstance(a_descr[1], dict):
t_values = list(a_descr[1].values())
else:
t_values = list(range(max(ta)+1))
treatment_values[attr_name] = t_values
if a_descr[1] == float:
n_trt = -1 # continuous treatment
else:
n_trt = len(t_values)-1
n_trts[attr_name] = n_trt
treatment_names = list(treatments.keys())
# parse targets
targets = OrderedDict()
for a_descr in target_attrs:
ta, attr_name = parse_attr(Xy_columns, header, a_descr, categ_as_strings)
targets[attr_name] = ta
target_names = list(targets.keys())
# predictors
feature_names = []
columns = []
for a_descr in feature_attrs:
fa, attr_name = parse_attr(Xy_columns, header, a_descr, categ_as_strings)
feature_names.append(attr_name)
columns.append(fa)
categ_values = dict()
for a in feature_attrs:
if isinstance(a[1], list):
categ_values[a[0]] = list(a[1])
elif isinstance(a[1], dict):
categ_values[a[0]] = list(a[1].values())
#X = np.core.records.fromarrays(columns, names=feature_names)
# create a Bunch
ret = Bunch(data=columns,
treatment_names=treatment_names,
feature_names=feature_names,
target_names=target_names)
for attr_name in targets:
ret[attr_name] = targets[attr_name]
for attr_name in treatments:
ret[attr_name] = treatments[attr_name]
ret[attr_name + "_values"] = treatment_values[attr_name]
if attr_name == "treatment":
# set n_trt for first treatment as default for backwards compatibility
n_trt_name = "n_trt"
else:
n_trt_name = attr_name + "_n_trt"
ret[n_trt_name] = n_trts[attr_name]
ret.categ_values=categ_values
return ret
def _prepare_final_data(D, shuffle, return_X_y,
record_mask=None,
random_state=None):
if record_mask is not None:
D.data = D.data[record_mask]
for ta in D.target_names:
D[ta] = D[ta][record_mask]
for ta in D.treatment_names:
D[ta] = D[ta][record_mask]
if shuffle:
ind = np.arange(D.data.shape[0])
rng = check_random_state(random_state)
rng.shuffle(ind)
D.data = D.data.iloc[ind]
for ta in D.target_names:
D[ta] = D[ta][ind]
for ta in D.treatment_names:
D[ta] = D[ta][ind]
if return_X_y:
X = D.data
targets = tuple(D[tn] for tn in D.target_names)
trt = D.treatment
ret = (X,) + targets + (trt,)
else:
ret = D
return ret
def _fetch_remote_csv(remote, dataset_name,
feature_attrs, treatment_attrs, target_attrs,
categ_as_strings=False, return_X_y=False,
as_frame=False,
download_if_missing=True,
random_state=None, shuffle=False,
header=None, total_attrs=None,
csv_reader_args={"delimiter":",", "quotechar":'"'},
all_num=False,
record_mask=None, remove_vars=None,
data_home=None, logger=None):
if logger is None:
logger = logging.getLogger(__name__ + "." + dataset_name)
data_home = get_data_home(data_home=data_home)
dataset_dir = join(data_home, "uplift_sklearn", dataset_name)
if categ_as_strings:
dataset_path = join(dataset_dir, "bunch_str")
else:
dataset_path = join(dataset_dir, "bunch")
available = exists(dataset_path)
if not available and download_if_missing:
if not exists(dataset_dir):
makedirs(dataset_dir)
logger.info("Downloading %s" % remote.url)
if remote.url.startswith("local:"):
local_name = remote.url[6:]
file_contents = vars(_data)[local_name]
archive = StringIO(file_contents)
else:
archive = _fetch_remote(remote, dirname=dataset_dir)
## read the data
D =_read_csv(archive, feature_attrs=feature_attrs,
treatment_attrs=treatment_attrs,
target_attrs=target_attrs,
total_attrs=total_attrs,
csv_reader_args=csv_reader_args,
categ_as_strings=categ_as_strings,
header=header, all_num=all_num)
joblib.dump(D, dataset_path, compress=9)
elif not available and not download_if_missing:
raise IOError("Data not found and `download_if_missing` is False")
# get cached data
try:
D
except NameError:
D = joblib.load(dataset_path)
# remove selected variables
if remove_vars is not None:
remove_vars = set(remove_vars)
D.data = [x for a, x in zip(D.feature_names, D.data) if a not in remove_vars]
D.feature_names = [a for a in D.feature_names if a not in remove_vars]
for a in remove_vars:
if a in D.categ_values:
del D.categ_values[a]
# change columns into a table
if as_frame:
import pandas
D.data = pandas.DataFrame(OrderedDict(zip(D.feature_names, D.data)))
else:
all_float = all((x.dtype.kind=="f") for x in D.data)
if not all_float:
for i, c in enumerate(D.data):
D.data[i] = c.astype(object)
D.data = np.column_stack(D.data)
# final returned data
ret = _prepare_final_data(D, shuffle, return_X_y,
record_mask=record_mask,
random_state=random_state)
return ret