Source code for optiml.ml.svm._base

import re
import warnings
from abc import ABC
from io import StringIO

import numpy as np
from qpsolvers import solve_qp
from sklearn.base import ClassifierMixin, BaseEstimator, RegressorMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from wurlitzer import pipes, STDOUT

from .kernels import gaussian, Kernel, LinearKernel
from .losses import (squared_hinge, squared_epsilon_insensitive,
                     Hinge, SquaredHinge, EpsilonInsensitive, SquaredEpsilonInsensitive)
from .smo import SMO, SMOClassifier, SMORegression
from ...opti import Optimizer
from ...opti import Quadratic
from ...opti.constrained import BoxConstrainedQuadraticOptimizer, AugmentedLagrangianQuadratic
from ...opti.unconstrained import ProximalBundle
from ...opti.unconstrained.line_search import LineSearchOptimizer
from ...opti.unconstrained.stochastic import StochasticOptimizer, StochasticMomentumOptimizer, StochasticGradientDescent


[docs] class SVM(BaseEstimator, ABC): """ Base abstract class for all SVM-type estimator. Parameters ---------- loss : `SVMLoss` instance, default=None Specifies the loss function. kernel : `Kernel` instance like {linear, poly, gaussian, sigmoid}, default=gaussian Specifies the kernel type to be used in the algorithm. C : float, default=1 Regularization parameter. The strength of the regularization is inversely proportional to C. Must be strictly positive. rho: int, default=1 Rho parameter for the augmented term of the Lagrangian method. Must be strictly positive. mu : float, default=1 Mu parameter for the proximal bundle method. Only used when ``optimizer`` is `ProximalBundle`. Must be strictly positive. fit_intercept : bool, default=True Whether to calculate the intercept for this model. If set to False, no intercept will be used in calculations (i.e., data is expected to be already centered). intercept_scaling : float, default=1 When ``fit_intercept`` is True, instance vector x becomes [x, intercept_scaling], i.e., a "synthetic" feature with constant value equals to ``intercept_scaling`` is appended to the instance vector. The intercept becomes intercept_scaling * synthetic feature weight Note: the synthetic feature weight is subject to L1/L2 regularization as all other features. To lessen the effect of regularization on synthetic feature weight (and therefore on the intercept) ``intercept_scaling`` has to be increased. reg_intercept : bool, default=False Whether to include the intercept in the regularization term. dual : bool, default=False Select the algorithm to either solve the dual or primal optimization problem. Prefer ``dual=False`` when n_samples > n_features and the instance vector is linearly separable in the given space or, if not, consider the possibly to apply a non-linear transformation of the instance vector using a low-rank kernel matrix approximation, i.e., Nystrom, before training. See more at: - https://scikit-learn.org/stable/modules/classes.html#module-sklearn.kernel_approximation - https://cdn.rawgit.com/mstrazar/mklaren/master/docs/build/html/projection.html optimizer : LineSearchOptimizer or StochasticOptimizer subclass, default=StochasticGradientDescent The solver for optimization. It can be a subclass of the `LineSearchOptimizer` which can converge faster and perform better for small datasets, e.g., the `BFGS` quasi-Newton method or, alternatively, a subclass of the `StochasticOptimizer` e.g., the `StochasticGradientDescent` or `Adam`, which works well on relatively large datasets (with thousands of training samples or more) in terms of both training time and validation score. master_solver : string, default='clarabel' Master solver for the proximal bundle method for the CVXPY interface. Only used when ``optimizer`` is `ProximalBundle`. learning_rate : 'auto' or double, default='auto' The initial learning rate used for weight update. It controls the step-size in updating the weights. Only used when ``optimizer`` is a subclass of `StochasticOptimizer`. If 'auto', 1/L is used where L is the Lipschitz constant. momentum_type : {'none', 'polyak', 'nesterov'}, default='none' Momentum type used for weight update. Only used when ``optimizer`` is a subclass of `StochasticOptimizer`. momentum : float, default=0.9 Momentum for weight update. Should be between 0 and 1. Only used when ``optimizer`` is a subclass of `StochasticOptimizer`. max_iter : int, default=1000 Maximum number of iterations. The solver iterates until convergence (determined by ``tol``) or this number of iterations. If the optimizer is a subclass of `StochasticOptimizer`, this value determines the number of epochs (how many times each data point will be used), not the number of gradient steps. max_f_eval : int, default=15000 Only used when ``optimizer`` is a subclass of `LineSearchOptimizer`. Maximum number of loss function calls. The solver iterates until convergence (determined by ``tol``), number of iterations reaches ``max_iter``, or this number of loss function calls. Note that number of loss function calls will be greater than or equal to the number of iterations. tol : float, default=1e-4 Tolerance for stopping criterion. batch_size : int, default=None Size of mini batches for stochastic optimizers. Only used when ``optimizer`` is a subclass of `StochasticOptimizer`. shuffle : bool, default=True Whether to shuffle samples for batch sampling in each iteration. Only used when the ``optimizer`` is a subclass of `StochasticOptimizer`. random_state : int, RandomState instance or None, default=None Controls the pseudo random number generation for train-test split if ``early_stopping`` is True and shuffling the data for batch sampling when an instance of `StochasticOptimizer` class is used as ``optimizer`` value. Pass an int for reproducible output across multiple function calls. early_stopping : bool, default=False Whether to use early stopping to terminate training. If set to True and ``validation_split`` is greater than 0, it will automatically set aside ``validation_split``% of training data as validation and terminate training when validation score is not improving by at least ``tol`` for ``patience`` consecutive epochs, otherwise terminate training when train loss does not improve by more than ``tol`` for ``patience`` consecutive passes over the training set. Only used when ``optimizer`` is a subclass of `StochasticOptimizer`. validation_split : float, default=0. The proportion of training data to set aside as validation set for early stopping. Must be between 0 and 1. Only used when ``optimizer`` is a subclass of `StochasticOptimizer` and ``early_stopping`` is True. patience : int, default=5 Maximum number of epochs to not meet ``tol`` improvement. Only used when ``optimizer`` is a subclass of `StochasticOptimizer`. verbose : bool or int, default=False Controls the verbosity of progress messages to stdout. Use a boolean value to switch on/off or an int value to show progress each ``verbose`` time optimization steps. master_verbose : bool or int, default=False Controls the verbosity of the CVXPY interface. Only used when ``optimizer`` is `ProximalBundle`. Attributes ---------- coef_ : ndarray of shape (n_features,) Weights assigned to the features (coefficients in the primal problem). This is only available in the case of a linear kernel. dual_coef_ : ndarray of shape (n_SV,) Coefficients of the support vector in the decision function. intercept_ : float Constants in decision function. support_ : ndarray of shape (n_SV,) Indices of support vectors. support_vectors_ : ndarray of shape (n_SV, n_features) Support vectors. """ def __init__(self, loss=None, kernel=gaussian, C=1, rho=1, mu=1, fit_intercept=True, intercept_scaling=1, reg_intercept=False, dual=False, optimizer=StochasticGradientDescent, master_solver='clarabel', learning_rate='auto', momentum_type='none', momentum=0.9, max_iter=1000, max_f_eval=15000, tol=1e-4, batch_size=None, shuffle=True, random_state=None, early_stopping=False, validation_split=0., patience=5, verbose=False, master_verbose=False): self.loss = loss if not isinstance(kernel, Kernel): raise TypeError(f'{kernel} is not an allowed kernel function') self.kernel = kernel if not C > 0: raise ValueError('C must be > 0') self.C = C if not rho > 0: raise ValueError('rho must be > 0') self.rho = rho if not mu > 0: raise ValueError('mu must be > 0') self.mu = mu if not isinstance(fit_intercept, bool): raise ValueError('fit_intercept mu be a boolean value') self.fit_intercept = fit_intercept self.intercept_scaling = intercept_scaling if not isinstance(reg_intercept, bool): raise ValueError('reg_intercept mu be a boolean value') self.reg_intercept = reg_intercept if not isinstance(dual, bool): raise ValueError('dual must be a boolean value') self.dual = dual if ((self.dual and not (isinstance(optimizer, str) or not issubclass(optimizer, SMO) or not issubclass(optimizer, Optimizer))) or (not self.dual and not issubclass(optimizer, Optimizer))): raise TypeError(f'{optimizer} is not an allowed optimization method') self.optimizer = optimizer self.master_solver = master_solver self.learning_rate = learning_rate self.max_iter = max_iter self.max_f_eval = max_f_eval self.momentum_type = momentum_type self.momentum = momentum if not tol > 0: raise ValueError('tol must be > 0') self.tol = tol self.batch_size = batch_size self.shuffle = shuffle self.random_state = random_state self.early_stopping = early_stopping self.validation_split = validation_split self.patience = patience self.verbose = verbose self.master_verbose = master_verbose if (not self.dual or (self.dual and isinstance(self.kernel, LinearKernel))): self.coef_ = np.zeros(0) self.intercept_ = 0. self.support_ = np.zeros(0) self.support_vectors_ = np.zeros(0) if self.dual: self.alphas_ = np.zeros(0) self.dual_coef_ = np.zeros(0) if not isinstance(optimizer, str): self.train_loss_history = [] if not self.dual and issubclass(self.optimizer, StochasticOptimizer): self.train_score_history = [] self._no_improvement_count = 0 self._avg_epoch_loss = 0 if self.validation_split: self.val_loss_history = [] self.val_score_history = [] self.best_val_score = -np.inf else: self.best_loss = np.inf
[docs] def fit(self, X, y): raise NotImplementedError
[docs] def decision_function(self, X): if self.dual and not isinstance(self.kernel, LinearKernel): return np.dot(self.dual_coef_, self.kernel(self.support_vectors_, X)) + self.intercept_ return np.dot(X, self.coef_) + self.intercept_
def _store_train_info(self, opt): if opt.is_lagrangian_dual(): self.train_loss_history.append(opt.primal_f_x) else: self.train_loss_history.append(opt.f_x) def _unpack(self, packed_coef_inter): if self.fit_intercept: self.coef_, self.intercept_ = packed_coef_inter[:-1], packed_coef_inter[-1] else: self.coef_ = packed_coef_inter def _store_train_val_info(self, opt, X_batch, y_batch, X_val, y_val): self._unpack(opt.x) self._avg_epoch_loss += opt.f_x * X_batch.shape[0] if opt.is_batch_end(): self._avg_epoch_loss /= opt.f.X.shape[0] # n_samples self.train_loss_history.append(self._avg_epoch_loss) if opt.is_verbose() and opt.epoch != opt.iter: print('\tavg_loss: {: 1.4e}'.format(self._avg_epoch_loss), end='') self._avg_epoch_loss = 0. if self.validation_split: val_loss = self.loss(opt.x, X_val, y_val) self.val_loss_history.append(val_loss) if opt.is_verbose(): print('\tval_loss: {: 1.4e}'.format(val_loss), end='') def _update_no_improvement_count(self, opt): if self.early_stopping: if self.validation_split: # monitor val_score if self.val_score_history[-1] < self.best_val_score + self.tol: self._no_improvement_count += 1 else: self._no_improvement_count = 0 if self.val_score_history[-1] > self.best_val_score: self.best_val_score = self.val_score_history[-1] self._best_coef = self.coef_.copy() else: # monitor train_loss if self.train_loss_history[-1] > self.best_loss - self.tol: self._no_improvement_count += 1 else: self._no_improvement_count = 0 if self.train_loss_history[-1] < self.best_loss: self.best_loss = self.train_loss_history[-1] if self._no_improvement_count >= self.patience: if self.validation_split: opt.x = self._best_coef if self.verbose: if self.validation_split: print(f'\ntraining stopped since validation score did not improve more than ' f'tol={self.tol} for {self.patience} consecutive epochs') else: print('\ntraining stopped since training loss did not improve more than ' f'tol={self.tol} for {self.patience} consecutive epochs') raise StopIteration
[docs] class SVC(ClassifierMixin, SVM): """ C-Support Vector Classification. Parameters ---------- loss : `SVMLoss` instance like {hinge, squared_hinge}, default='squared_hinge' Specifies the loss function. The hinge loss is the L1 loss, while the squared hinge loss is the L2 loss. """ def __init__(self, loss=squared_hinge, kernel=gaussian, C=1, rho=1, mu=1, fit_intercept=True, intercept_scaling=1, reg_intercept=False, dual=False, optimizer=StochasticGradientDescent, master_solver='clarabel', learning_rate='auto', momentum_type='none', momentum=0.9, max_iter=1000, max_f_eval=15000, tol=1e-4, batch_size=None, shuffle=True, random_state=None, early_stopping=False, validation_split=0., patience=5, verbose=False, master_verbose=False): super(SVC, self).__init__(loss=loss, kernel=kernel, C=C, rho=rho, mu=mu, fit_intercept=fit_intercept, intercept_scaling=intercept_scaling, reg_intercept=reg_intercept, dual=dual, optimizer=optimizer, master_solver=master_solver, learning_rate=learning_rate, momentum_type=momentum_type, momentum=momentum, max_iter=max_iter, max_f_eval=max_f_eval, tol=tol, batch_size=batch_size, shuffle=shuffle, random_state=random_state, early_stopping=early_stopping, validation_split=validation_split, patience=patience, verbose=verbose, master_verbose=master_verbose) if not loss._loss_type == 'classifier': raise TypeError(f'{loss} is not an allowed SVC loss function') self.lb = LabelBinarizer(neg_label=-1) def _store_train_val_info(self, opt, X_batch, y_batch, X_val, y_val): super(SVC, self)._store_train_val_info(opt, X_batch, y_batch, X_val, y_val) if opt.is_batch_end(): acc = self.score(X_batch[:, :-1] if self.fit_intercept else X_batch, y_batch) self.train_score_history.append(acc) if opt.is_verbose(): print('\tacc: {:1.4f}'.format(acc), end='') if self.validation_split: val_acc = self.score(X_val[:, :-1] if self.fit_intercept else X_val, y_val) self.val_score_history.append(val_acc) if opt.is_verbose(): print('\tval_acc: {:1.4f}'.format(val_acc), end='') self._update_no_improvement_count(opt)
[docs] def fit(self, X, y): self.lb.fit(y) if len(self.lb.classes_) > 2: raise ValueError('use OneVsOneClassifier or OneVsRestClassifier from sklearn.multiclass ' 'to train a model over more than two labels') y = self.lb.transform(y).ravel() if not self.dual: if issubclass(self.optimizer, LineSearchOptimizer): if self.fit_intercept: X_biased = np.c_[X, np.full_like(y, self.intercept_scaling)] else: X_biased = X self.loss = self.loss(self, X_biased, y) self.optimizer = self.optimizer(f=self.loss, max_iter=self.max_iter, max_f_eval=self.max_f_eval, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': if self.optimizer.iter >= self.max_iter: warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) elif self.optimizer.f_eval >= self.max_f_eval: warnings.warn('max_f_eval reached but the optimization has not converged yet', ConvergenceWarning) self._unpack(self.optimizer.x) elif issubclass(self.optimizer, ProximalBundle): if self.fit_intercept: X_biased = np.c_[X, np.full_like(y, self.intercept_scaling)] else: X_biased = X self.loss = self.loss(self, X_biased, y) self.optimizer = self.optimizer(f=self.loss, mu=self.mu, max_iter=self.max_iter, master_solver=self.master_solver, master_verbose=self.master_verbose, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'error': warnings.warn('failure while computing direction for the master problem', ConvergenceWarning) self._unpack(self.optimizer.x) elif issubclass(self.optimizer, StochasticOptimizer): if self.validation_split: X, X_val, y, y_val = train_test_split(X, y, test_size=self.validation_split, random_state=self.random_state) if self.fit_intercept: X_val_biased = np.c_[X_val, np.full_like(y_val, self.intercept_scaling)] else: X_val_biased = X_val else: X_val_biased = None y_val = None if self.fit_intercept: X_biased = np.c_[X, np.full_like(y, self.intercept_scaling)] else: X_biased = X self.loss = self.loss(self, X_biased, y) if issubclass(self.optimizer, StochasticMomentumOptimizer): self.optimizer = self.optimizer(f=self.loss, epochs=self.max_iter, step_size=(self.loss.step_size if self.learning_rate == 'auto' else self.learning_rate), momentum_type=self.momentum_type, momentum=self.momentum, batch_size=self.batch_size, callback=self._store_train_val_info, callback_args=(X_val_biased, y_val), shuffle=self.shuffle, random_state=self.random_state, verbose=self.verbose).minimize() else: self.optimizer = self.optimizer(f=self.loss, epochs=self.max_iter, step_size=(self.loss.step_size if self.learning_rate == 'auto' else self.learning_rate), batch_size=self.batch_size, callback=self._store_train_val_info, callback_args=(X_val_biased, y_val), shuffle=self.shuffle, random_state=self.random_state, verbose=self.verbose).minimize() else: raise TypeError(f'{self.optimizer} is not an allowed optimizer') self.support_ = np.argwhere(np.abs(self.decision_function(X)) <= 1).ravel() self.support_vectors_ = X[self.support_] else: n_samples = len(y) # kernel matrix K = self.kernel(X) Q = K * np.outer(y, y) q = -np.ones(n_samples) if self.loss == Hinge: ub = np.ones(n_samples) * self.C # upper bounds if self.optimizer == 'smo' or self.optimizer == SMO: if not self.reg_intercept: self.obj = Quadratic(Q, q) self.optimizer = SMOClassifier(self.obj, X, y, K, self.kernel, self.C, self.tol, self.verbose).minimize() self.alphas_ = self.optimizer.alphas if isinstance(self.kernel, LinearKernel): self.coef_ = self.optimizer.w self.intercept_ = self.optimizer.b else: raise NotImplementedError elif isinstance(self.optimizer, str): lb = np.zeros(n_samples) # lower bounds if not self.reg_intercept: self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, A=y.astype(float), b=np.zeros(1), lb=lb, ub=ub, solver=self.optimizer, verbose=False if self.verbose < 0 else True) # trick for Jupyter else: Q += np.outer(y, y) self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, lb=lb, ub=ub, solver=self.optimizer, verbose=False if self.verbose < 0 else True) # trick for Jupyter stdout = out.getvalue() if stdout: self.iter = int(max(re.findall(r'(\d+):', stdout))) if self.verbose: print(stdout) else: if issubclass(self.optimizer, BoxConstrainedQuadraticOptimizer): if not self.reg_intercept: # TODO constrained optimizer with A x = 0 and 0 <= x <= ub is not available raise NotImplementedError else: Q += np.outer(y, y) self.obj = Quadratic(Q, q) self.optimizer = self.optimizer(quad=self.obj, ub=ub, tol=self.tol, max_iter=self.max_iter, callback=self._store_train_info, verbose=self.verbose).minimize() elif issubclass(self.optimizer, Optimizer): lb = np.zeros(n_samples) # lower bounds if not self.reg_intercept: self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), A=y, b=np.zeros(1), lb=lb, ub=ub, rho=self.rho) else: Q += np.outer(y, y) self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), lb=lb, ub=ub, rho=self.rho) if issubclass(self.optimizer, LineSearchOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, max_iter=self.max_iter, max_f_eval=self.max_f_eval, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': if self.optimizer.iter >= self.max_iter: warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) elif self.optimizer.f_eval >= self.max_f_eval: warnings.warn('max_f_eval reached but the optimization has not converged yet', ConvergenceWarning) elif issubclass(self.optimizer, ProximalBundle): self.optimizer = self.optimizer(f=self.obj, mu=self.mu, tol=self.tol, max_iter=self.max_iter, master_solver=self.master_solver, master_verbose=self.master_verbose, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'error': warnings.warn('failure while computing direction for the master problem', ConvergenceWarning) elif issubclass(self.optimizer, StochasticOptimizer): if issubclass(self.optimizer, StochasticMomentumOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, momentum_type=self.momentum_type, momentum=self.momentum, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() else: self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) else: raise TypeError(f'{self.optimizer} is not an allowed optimizer') self.alphas_ = self.optimizer.x elif self.loss == SquaredHinge: D = np.diag(np.ones(n_samples) / (2 * self.C)) Q += D if isinstance(self.optimizer, str): lb = np.zeros(n_samples) # lower bounds if not self.reg_intercept: self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, A=y.astype(float), b=np.zeros(1), lb=lb, solver=self.optimizer, verbose=False if self.verbose < 0 else True) # trick for Jupyter else: Q += np.outer(y, y) self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, lb=lb, solver=self.optimizer, verbose=False if self.verbose < 0 else True) # trick for Jupyter stdout = out.getvalue() if stdout: self.iter = int(max(re.findall(r'(\d+):', stdout))) if self.verbose: print(stdout) else: if issubclass(self.optimizer, BoxConstrainedQuadraticOptimizer): # TODO bcqp optimizer with 0 <= x <= +inf, i.e., without upper bound, is not available raise NotImplementedError elif issubclass(self.optimizer, Optimizer): lb = np.zeros(n_samples) # lower bounds if not self.reg_intercept: self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), A=y, b=np.zeros(1), lb=lb, rho=self.rho) else: Q += np.outer(y, y) self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), lb=lb, rho=self.rho) if issubclass(self.optimizer, LineSearchOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, max_iter=self.max_iter, max_f_eval=self.max_f_eval, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': if self.optimizer.iter >= self.max_iter: warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) elif self.optimizer.f_eval >= self.max_f_eval: warnings.warn('max_f_eval reached but the optimization has not converged yet', ConvergenceWarning) elif issubclass(self.optimizer, ProximalBundle): self.optimizer = self.optimizer(f=self.obj, mu=self.mu, tol=self.tol, max_iter=self.max_iter, master_solver=self.master_solver, master_verbose=self.master_verbose, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'error': warnings.warn('failure while computing direction for the master problem', ConvergenceWarning) elif issubclass(self.optimizer, StochasticOptimizer): if issubclass(self.optimizer, StochasticMomentumOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, momentum_type=self.momentum_type, momentum=self.momentum, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() else: self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) else: raise TypeError(f'{self.optimizer} is not an allowed optimizer') self.alphas_ = self.optimizer.x else: raise TypeError(f'{self.loss} is not an allowed loss') sv = self.alphas_ > 1e-6 self.support_ = np.arange(len(self.alphas_))[sv] self.support_vectors_, sv_y, alphas = X[sv], y[sv], self.alphas_[sv] self.dual_coef_ = alphas * sv_y if self.optimizer != SMOClassifier: if isinstance(self.kernel, LinearKernel): self.coef_ = np.dot(self.dual_coef_, self.support_vectors_) for n in range(len(alphas)): self.intercept_ += sv_y[n] self.intercept_ -= np.sum(self.dual_coef_ * K[self.support_[n], sv]) self.intercept_ /= len(alphas) return self
[docs] def predict(self, X): return self.lb.inverse_transform(self.decision_function(X))
[docs] class SVR(RegressorMixin, SVM): """ Epsilon-Support Vector Regression. Parameters ---------- loss : `SVMLoss` instance like {epsilon_insensitive, squared_epsilon_insensitive}, \ default='squared_epsilon_insensitive' Specifies the loss function. The epsilon-insensitive loss is the L1 loss, while the squared epsilon-insensitive loss is the L2 loss. epsilon : float, default=0.1 Epsilon parameter in the (squared) epsilon-insensitive loss function. It specifies the epsilon-tube within which no penalty is associated in the training loss function with points predicted within a distance epsilon from the actual value. """ def __init__(self, loss=squared_epsilon_insensitive, epsilon=0.1, kernel=gaussian, C=1, rho=1, mu=1, fit_intercept=True, intercept_scaling=1, reg_intercept=False, dual=False, optimizer=StochasticGradientDescent, master_solver='clarabel', learning_rate='auto', momentum_type='none', momentum=0.9, max_iter=1000, max_f_eval=15000, tol=1e-4, batch_size=None, shuffle=True, random_state=None, early_stopping=False, validation_split=0., patience=5, verbose=False, master_verbose=False): super(SVR, self).__init__(loss=loss, kernel=kernel, C=C, rho=rho, mu=mu, fit_intercept=fit_intercept, intercept_scaling=intercept_scaling, reg_intercept=reg_intercept, dual=dual, optimizer=optimizer, master_solver=master_solver, learning_rate=learning_rate, momentum_type=momentum_type, momentum=momentum, max_iter=max_iter, max_f_eval=max_f_eval, tol=tol, batch_size=batch_size, shuffle=shuffle, random_state=random_state, early_stopping=early_stopping, validation_split=validation_split, patience=patience, verbose=verbose, master_verbose=master_verbose) if not loss._loss_type == 'regressor': raise TypeError(f'{loss} is not an allowed SVR loss function') if not epsilon >= 0: raise ValueError('epsilon must be >= 0') self.epsilon = epsilon def _store_train_val_info(self, opt, X_batch, y_batch, X_val, y_val): super(SVR, self)._store_train_val_info(opt, X_batch, y_batch, X_val, y_val) if opt.is_batch_end(): r2 = self.score(X_batch[:, :-1] if self.fit_intercept else X_batch, y_batch) self.train_score_history.append(r2) if opt.is_verbose(): print('\tr2: {: 1.4f}'.format(r2), end='') if self.validation_split: val_r2 = self.score(X_val[:, :-1] if self.fit_intercept else X_val, y_val) self.val_score_history.append(val_r2) if opt.is_verbose(): print('\tval_r2: {: 1.4f}'.format(val_r2), end='') self._update_no_improvement_count(opt)
[docs] def fit(self, X, y): targets = y.shape[1] if y.ndim > 1 else 1 if targets > 1: raise ValueError('use sklearn.multioutput.MultiOutputRegressor ' 'to train a model over more than one target') if not self.dual: if issubclass(self.optimizer, LineSearchOptimizer): if self.fit_intercept: X_biased = np.c_[X, np.full_like(y, self.intercept_scaling)] else: X_biased = X self.loss = self.loss(self, X_biased, y, self.epsilon) self.optimizer = self.optimizer(f=self.loss, max_iter=self.max_iter, max_f_eval=self.max_f_eval, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': if self.optimizer.iter >= self.max_iter: warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) elif self.optimizer.f_eval >= self.max_f_eval: warnings.warn('max_f_eval reached but the optimization has not converged yet', ConvergenceWarning) self._unpack(self.optimizer.x) elif issubclass(self.optimizer, ProximalBundle): if self.fit_intercept: X_biased = np.c_[X, np.full_like(y, self.intercept_scaling)] else: X_biased = X self.loss = self.loss(self, X_biased, y, self.epsilon) self.optimizer = self.optimizer(f=self.loss, mu=self.mu, max_iter=self.max_iter, master_solver=self.master_solver, master_verbose=self.master_verbose, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'error': warnings.warn('failure while computing direction for the master problem', ConvergenceWarning) self._unpack(self.optimizer.x) elif issubclass(self.optimizer, StochasticOptimizer): if self.validation_split: X, X_val, y, y_val = train_test_split(X, y, test_size=self.validation_split, random_state=self.random_state) if self.fit_intercept: X_val_biased = np.c_[X_val, np.full_like(y_val, self.intercept_scaling)] else: X_val_biased = X_val else: X_val_biased = None y_val = None if self.fit_intercept: X_biased = np.c_[X, np.full_like(y, self.intercept_scaling)] else: X_biased = X self.loss = self.loss(self, X_biased, y, self.epsilon) if issubclass(self.optimizer, StochasticMomentumOptimizer): self.optimizer = self.optimizer(f=self.loss, epochs=self.max_iter, step_size=(self.loss.step_size if self.learning_rate == 'auto' else self.learning_rate), momentum_type=self.momentum_type, momentum=self.momentum, batch_size=self.batch_size, callback=self._store_train_val_info, callback_args=(X_val_biased, y_val), shuffle=self.shuffle, random_state=self.random_state, verbose=self.verbose).minimize() else: self.optimizer = self.optimizer(f=self.loss, epochs=self.max_iter, step_size=(self.loss.step_size if self.learning_rate == 'auto' else self.learning_rate), batch_size=self.batch_size, callback=self._store_train_val_info, callback_args=(X_val_biased, y_val), shuffle=self.shuffle, random_state=self.random_state, verbose=self.verbose).minimize() else: raise TypeError(f'{self.optimizer} is not an allowed optimizer') self.support_ = np.argwhere(np.abs(y - self.predict(X)) >= self.epsilon).ravel() self.support_vectors_ = X[self.support_] else: n_samples = len(y) # kernel matrix K = self.kernel(X) Q = np.vstack((np.hstack((K, -K)), np.hstack((-K, K)))) q = np.hstack((-y, y)) + self.epsilon if self.loss == EpsilonInsensitive: ub = np.ones(2 * n_samples) * self.C # upper bounds if self.optimizer == 'smo' or self.optimizer == SMO: if not self.reg_intercept: self.obj = Quadratic(Q, q) self.optimizer = SMORegression(self.obj, X, y, K, self.kernel, self.C, self.epsilon, self.tol, self.verbose).minimize() alphas_p, alphas_n = self.optimizer.alphas_p, self.optimizer.alphas_n self.alphas_ = np.concatenate((alphas_p, alphas_n)) if isinstance(self.kernel, LinearKernel): self.coef_ = self.optimizer.w self.intercept_ = self.optimizer.b else: raise NotImplementedError else: e = np.hstack((np.ones(n_samples), -np.ones(n_samples))) # equality matrix if isinstance(self.optimizer, str): lb = np.zeros(2 * n_samples) # lower bounds if not self.reg_intercept: self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, A=e, b=np.zeros(1), lb=lb, ub=ub, solver=self.optimizer, verbose=self.verbose >= 0) # quiet trick for Jupyter else: Q += np.outer(e, e) self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, lb=lb, ub=ub, solver=self.optimizer, verbose=self.verbose >= 0) # quiet trick for Jupyter stdout = out.getvalue() if stdout: self.iter = int(max(re.findall(r'(\d+):', stdout))) if self.verbose: print(stdout) else: if issubclass(self.optimizer, BoxConstrainedQuadraticOptimizer): if not self.reg_intercept: # TODO constrained optimizer with A x = 0 and 0 <= x <= ub is not available raise NotImplementedError else: Q += np.outer(e, e) self.obj = Quadratic(Q, q) self.optimizer = self.optimizer(quad=self.obj, ub=ub, tol=self.tol, max_iter=self.max_iter, callback=self._store_train_info, verbose=self.verbose).minimize() elif issubclass(self.optimizer, Optimizer): lb = np.zeros(2 * n_samples) # lower bounds if not self.reg_intercept: self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), A=e, b=np.zeros(1), lb=lb, ub=ub, rho=self.rho) else: Q += np.outer(e, e) self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), lb=lb, ub=ub, rho=self.rho) if issubclass(self.optimizer, LineSearchOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, max_iter=self.max_iter, max_f_eval=self.max_f_eval, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': if self.optimizer.iter >= self.max_iter: warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) elif self.optimizer.f_eval >= self.max_f_eval: warnings.warn('max_f_eval reached but the optimization has not converged yet', ConvergenceWarning) elif issubclass(self.optimizer, ProximalBundle): self.optimizer = self.optimizer(f=self.obj, mu=self.mu, tol=self.tol, max_iter=self.max_iter, master_solver=self.master_solver, master_verbose=self.master_verbose, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'error': warnings.warn('failure while computing direction for the master problem', ConvergenceWarning) elif issubclass(self.optimizer, StochasticOptimizer): if issubclass(self.optimizer, StochasticMomentumOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, momentum_type=self.momentum_type, momentum=self.momentum, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() else: self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) else: raise TypeError(f'{self.optimizer} is not an allowed optimizer') self.alphas_ = self.optimizer.x alphas_p, alphas_n = np.split(self.alphas_, 2) elif self.loss == SquaredEpsilonInsensitive: D = np.diag(np.ones(2 * n_samples) / (2 * self.C)) Q += D e = np.hstack((np.ones(n_samples), -np.ones(n_samples))) # equality matrix if isinstance(self.optimizer, str): lb = np.zeros(2 * n_samples) # lower bounds if not self.reg_intercept: self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, A=e, b=np.zeros(1), lb=lb, solver=self.optimizer, verbose=False if self.verbose < 0 else True) # trick for Jupyter else: Q += np.outer(e, e) self.obj = Quadratic(Q, q) out = StringIO() with pipes(stdout=out, stderr=STDOUT): self.alphas_ = solve_qp(P=Q, q=q, lb=lb, solver=self.optimizer, verbose=False if self.verbose < 0 else True) # trick for Jupyter stdout = out.getvalue() if stdout: self.iter = int(max(re.findall(r'(\d+):', stdout))) if self.verbose: print(stdout) else: if issubclass(self.optimizer, BoxConstrainedQuadraticOptimizer): # TODO bcqp optimizer with 0 <= x <= +inf, i.e., without upper bound, is not available raise NotImplementedError elif issubclass(self.optimizer, Optimizer): lb = np.zeros(2 * n_samples) # lower bounds if not self.reg_intercept: self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), A=e, b=np.zeros(1), lb=lb, rho=self.rho) else: Q += np.outer(e, e) self.obj = AugmentedLagrangianQuadratic(primal=Quadratic(Q, q), lb=lb, rho=self.rho) if issubclass(self.optimizer, LineSearchOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, max_iter=self.max_iter, max_f_eval=self.max_f_eval, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': if self.optimizer.iter >= self.max_iter: warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) elif self.optimizer.f_eval >= self.max_f_eval: warnings.warn('max_f_eval reached but the optimization has not converged yet', ConvergenceWarning) elif issubclass(self.optimizer, ProximalBundle): self.optimizer = self.optimizer(f=self.obj, mu=self.mu, tol=self.tol, max_iter=self.max_iter, master_solver=self.master_solver, master_verbose=self.master_verbose, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'error': warnings.warn('failure while computing direction for the master problem', ConvergenceWarning) elif issubclass(self.optimizer, StochasticOptimizer): if issubclass(self.optimizer, StochasticMomentumOptimizer): self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, momentum_type=self.momentum_type, momentum=self.momentum, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() else: self.optimizer = self.optimizer(f=self.obj, tol=self.tol, step_size=self.learning_rate, epochs=self.max_iter, random_state=self.random_state, callback=self._store_train_info, verbose=self.verbose).minimize() if self.optimizer.status == 'stopped': warnings.warn('max_iter reached but the optimization has not converged yet', ConvergenceWarning) else: raise TypeError(f'{self.optimizer} is not an allowed optimizer') self.alphas_ = self.optimizer.x alphas_p, alphas_n = np.split(self.alphas_, 2) else: raise TypeError(f'{self.loss} is not an allowed loss') sv = np.logical_or(alphas_p > 1e-6, alphas_n > 1e-6) self.support_ = np.arange(len(alphas_p))[sv] self.support_vectors_, sv_y, alphas_p, alphas_n = X[sv], y[sv], alphas_p[sv], alphas_n[sv] self.dual_coef_ = alphas_p - alphas_n if self.optimizer != SMORegression: if isinstance(self.kernel, LinearKernel): self.coef_ = np.dot(self.dual_coef_, self.support_vectors_) for n in range(len(alphas_p)): self.intercept_ += sv_y[n] self.intercept_ -= np.sum(self.dual_coef_ * K[self.support_[n], sv]) self.intercept_ -= self.epsilon self.intercept_ /= len(alphas_p) return self
[docs] def predict(self, X): return self.decision_function(X)