"""
Hilbert Space based regression
==================================
"""
from numpy import sqrt, array, ndarray, delete, concatenate, power
from numpy.linalg import det
from scipy import integrate
from scipy.stats import t
Infinitesimal = 1e-7
[docs]class Error(Exception):
r"""
Generic errors that may occur in the course of a run.
"""
def __init__(self, *args):
super(Error, self).__init__(*args)
[docs]class Measure(object):
r"""
Constructs a measure :math:`\mu` based on `density` and `domain`.
:param density: the density over the domain:
+ if none is given, it assumes uniform distribution
+ if a callable `h` is given, then :math:`d\mu=h(x)dx`
+ if a dictionary is given, then :math:`\mu=\sum w_x\delta_x` a discrete measure.
The points :math:`x` are the keys of the dictionary (tuples) and the weights :math:`w_x` are the values.
:param domain: if `density` is a dictionary, it will be set by its keys. If callable, then `domain` must be a list
of tuples defining the domain's box. If None is given, it will be set to :math:`[-1, 1]^n`
"""
def __init__(self, density=None, domain=None):
# set the density
if density is None:
self.density = lambda x: 1.0
elif callable(density):
self.density = density
elif isinstance(density, dict):
self.density = lambda x, density_=density: density_[x] if x in density else 0.
else:
raise Error(
"The `density` must be either a callable or a dictionary of real numbers."
)
# check and set the domain
self.continuous = True
self.dim = 0
if isinstance(domain, list):
self.dim = len(domain)
for intrvl in domain:
if (not isinstance(intrvl, (list, tuple))) or (len(intrvl) != 2):
raise Error("`domain` should be a list of 2-tuples.")
self.supp = domain
elif isinstance(density, dict):
self.supp = density.keys()
self.continuous = False
else:
raise Error("No domain is specified.")
[docs] def integral(self, f):
r"""
Calculates :math:`\int_{domain} fd\mu`.
:param f: the integrand
:return: the value of the integral
"""
from types import FunctionType
m = 0.0
if not isinstance(
f, (dict, FunctionType)
):
raise Error("The integrand must be a `function` or a `dict`")
if isinstance(f, dict):
fn = lambda x: f[x] if x in f else 0.0
else:
fn = f
if self.continuous:
fw = lambda *x: fn(*x) * self.density(*x)
m = integrate.nquad(fw, self.supp)[0]
else:
for p in self.supp:
m += self.density(p) * fn(p)
return m
[docs] def norm(self, p, f):
r"""
Computes the norm-`p` of the `f` with respect to the current measure,
i.e., :math:`(\int_{domain}|f|^p d\mu)^{1/p}`.
:param p: a positive real number
:param f: the function whose norm is desired.
:return: :math:`\|f\|_{p, \mu}`
"""
absfp = lambda *x: pow(abs(f(*x)), p)
return pow(self.integral(absfp), 1.0 / p)
[docs]class FunctionSpace(object):
r"""
A class tha facilitates a few types of computations over function spaces of type :math:`L_2(X, \mu)`
:param dim: the dimension of 'X' (default: 1)
:param measure: an object of type `Measure` representing :math:`\mu`
:param basis: a finite basis of functions to construct a subset of :math:`L_2(X, \mu)`
"""
dim = 1 # type: int
def __init__(self, dim=1, measure=None, basis=None):
self.dim = int(dim)
if (measure is not None) and (isinstance(measure, Measure)):
self.measure = measure
else:
# default measure is set to be the Lebesgue measure on [0, 1]^dim
D = [(0.0, 1.0) for _ in range(self.dim)]
self.measure = Measure(domain=D)
if basis is None:
# default basis is linear
base = [lambda x: 1.0]
for i in range(self.dim):
base.append(lambda x, i_=i: x[i_] if isinstance(x, array) else x)
self.base = base
else:
self.base = basis
self.orth_base = []
self.Gram = None
[docs] def inner(self, f, g):
r"""
Computes the inner product of the two parameters with respect to
the measure `measure`, i.e., :math:`\int_Xf\cdot g d\mu`.
:param f: callable
:param g: callable
:return: the quantity of :math:`\int_Xf\cdot g d\mu`
"""
fn = lambda x, f_=f, g_=g: f_(x) * g_(x)
return self.measure.integral(fn)
[docs] def project(self, f, g):
r"""
Finds the projection of `f` on `g` with respect to the inner
product induced by the measure `measure`.
:param f: callable
:param g: callable
:return: the quantity of :math:`\frac{\langle f, g\rangle}{\|g\|_2}g`
"""
a = self.inner(f, g)
b = self.inner(g, g)
return lambda x, a_=a, b_=b: a_ * g(x) / b_
def gram_mat(self):
num = len(self.base)
cfs = array([[0.0] * num] * num)
for i in range(num):
for j in range(i, num):
cf = self.inner(self.base[i], self.base[j])
cfs[i][j] = cf
cfs[j][i] = cf
self.Gram = cfs
def minor_gram(self, i):
if self.Gram is None:
self.gram_mat()
return array(
[[self.Gram[idx][jdx] for idx in range(i + 1)] for jdx in range(i + 1)]
)
def minor(self, i, j):
if j == 1:
return 1.0
cfs = array([[0.0] * j] * (j - 1))
for jdx in range(j):
for idx in range(j - 1):
cfs[idx][jdx] = self.Gram[idx][jdx]
return det(delete(cfs, i, 1))
[docs] def series(self, f):
r"""
Given a function `f`, this method finds and returns the
coefficients of the series that approximates `f` as a
linear combination of the elements of the orthogonal basis :math:`B`.
In symbols :math:`\sum_{b\in B}\langle f, b\rangle b`.
:return: the list of coefficients :math:`\langle f, b\rangle` for :math:`b\in B`
"""
cfs = []
for b in self.orth_base:
cfs.append(self.inner(f, b))
return cfs
[docs]class Regression(object):
"""
Given a set of points, i.e., a list of tuples of the equal lengths `P`, this class computes the best approximation
of a function that fits the data, in the following sense:
+ if no extra parameters is provided, meaning that an object is initiated like ``R = Regression(P)`` then
calling ``R.fit()`` returns the linear regression that fits the data.
+ if at initiation the parameter `deg=n` is set, then ``R.fit()`` returns the polynomial regression of
degree `n`.
+ if a basis of functions provided by means of an `OrthSystem` object (``R.SetOrthSys(orth)``) then
calling ``R.fit()`` returns the best approximation that can be found using the basic functions of
the `orth` object.
:param points: a list of points to be fitted or a callable to be approximated
:param dim: dimension of the domain
"""
def __init__(self, points, dim=None):
self.Points = None
if isinstance(points, (ndarray, list, array)):
self.Points = list(points)
self.dim = len(points[0]) - 1
supp = {}
for p in points:
supp[tuple(p[:-1])] = 1.0
self.meas = Measure(supp)
self.f = lambda x: sum(
[
p_[-1] * (1 * (abs(x - array(p_[:-1])) < 1.0e-4)).min()
for p_ in points
]
)
elif callable(points):
if dim is None:
raise Error("The dimension can not be determined")
else:
self.dim = dim
self.f = points
self.meas = Measure(domain=[(-1.0, 1.0) for _ in range(self.dim)])
self.orth = FunctionSpace(dim=self.dim, measure=self.meas)
[docs] def set_measure(self, meas):
"""
Sets the default measure for approximation.
:param meas: a measure.Measure object
:return: None
"""
if not isinstance(meas, Measure):
raise AssertionError("`set_measure` accepts a `NpyProximation.Measure` object.")
self.meas = meas
[docs] def set_func_spc(self, sys):
"""
Sets the bases of the orthogonal basis
:param sys: `orthsys.OrthSystem` object.
:return: None
.. Note::
For technical reasons, the measure needs to be given via `set_measure` method. Otherwise, the Lebesque
measure on :math:`[-1, 1]^n` is assumed.
"""
if self.dim != sys.dim:
raise AssertionError(
"Dimensions of points and the orthogonal system do not match."
)
sys.measure = self.meas
self.orth = sys
self.orth.form_basis()
[docs] def fit(self):
"""
Fits the best curve based on the optional provided orthogonal basis.
If no basis is provided, it fits a polynomial of a given degree (at initiation)
:return: The fit.
"""
coefs = self.orth.series(self.f)
aprx = lambda x: sum(
[
coefs[i] * self.orth.orth_base[i](x)
for i in range(len(self.orth.orth_base))
]
)
return aprx
try:
from sklearn.base import BaseEstimator, RegressorMixin
except ModuleNotFoundError:
BaseEstimator = type("BaseEstimator", (object,), dict())
RegressorMixin = type("RegressorMixin", (object,), dict())
[docs]class HilbertRegressor(BaseEstimator, RegressorMixin):
r"""
Regression using Hilbert Space techniques Scikit-Learn style.
:param deg: int, default=3
The degree of polynomial regression. Only used if `base` is `None`
:param base: list, default = None
a list of function to form an orthogonal function basis
:param meas: `NpyProximation.Measure`, default = None
the measure to form the :math:`L_2(\mu)` space. If `None` a discrete measure will be constructed based
on `fit` inputs
:param fspace: `NpyProximation.FunctionBasis`, default = None
the function subspace of :math:`L_2(\mu)`, if `None` it will be initiated according to `self.meas`
"""
def __init__(self, deg=3, base=None, meas=None, fspace=None, c_limit=.95):
self.deg = deg
self.meas = meas
self.base = base
self.fspace = fspace
self.regressor = None
self.dim = 0
self.apprx = None
# for confidence interval
self.c_limit = (c_limit + 1.0) / 2.0
self.t_stat = 0.
self.training_var = 0.
self.training_size = 0
self.ci_band = None
self.x_mean = 0.
self.sum_sqrd_x = 0
[docs] def fit(self, X, y):
"""
Calculates an orthonormal basis according to the given function space basis and the discrete measure from the
training points.
:param X: Training data
:param y: Target values
:return: `self`
"""
if len(X.shape) != 2:
X = X.reshape(X.shape[0], 1)
self.training_size = X.shape[0]
points = concatenate((X, y.reshape(y.shape[0], 1)), axis=1)
self.regressor = Regression(points)
self.dim = X[0].shape[0]
if self.meas is not None:
self.regressor.set_measure(self.meas)
if self.fspace is not None:
self.regressor.set_func_spc(self.fspace)
else:
from .extras import FunctionBasis
bs = FunctionBasis()
B = bs.poly(n=self.dim, deg=self.deg) if self.base is None else self.base
self.fspace = FunctionSpace(dim=self.dim, basis=B, measure=self.meas)
self.regressor.set_func_spc(self.fspace)
self.apprx = self.regressor.fit()
res = array([self.apprx(x) for x in X])
self.x_mean = X.mean()
self.sum_sqrd_x = sum(power(X, 2))
self.training_var = sqrt(sum(power(res - y.reshape((1, -1))[0], 2)) / max(self.training_size - 2, 1))
self.t_stat = t.ppf(self.c_limit, self.training_size - 1)
return self
[docs] def predict(self, X):
"""
Predict using the Hilbert regression method
:param X: data points for prediction
:return: returns predicted values
"""
if len(X.shape) != 2:
X = X.reshape(X.shape[0], 1)
# Adjust the `self.x_mean` according to the weight
if X.shape[1] == 1:
self.ci_band = (
self.t_stat
* self.training_var
* (
1. / self.training_size
+ power(X - self.x_mean, 2)
/ sqrt(self.sum_sqrd_x - self.training_size * power(self.x_mean, 2))
)
).reshape(1, -1)[0]
return array([self.apprx(x) for x in X])
[docs] def score(self, X, y, sample_weight=None):
"""
The default scoring method is the weighted mean square error
:param X:
:param y:
:param sample_weight:
:return:
"""
num_points = len(X)
weights = [self.regressor.meas.density(tuple(x)) for x in X]
vals = [self.apprx(x) for x in X]
sum_w = sum(weights)
errors = [weights[_] * (vals[_] - y[_]) ** 2 for _ in range(num_points)]
try:
return sum(errors)[0] / sum_w
except IndexError:
return sum(errors) / sum_w