Source code for kaggler.model.nn

from logging import getLogger
import numpy as np
from scipy import sparse
from scipy.optimize import minimize
from sklearn.metrics import roc_auc_score
import time


from ..const import SEC_PER_MIN


logger = getLogger(__name__)


[docs]class NN(object): """Implement a neural network with a single h layer.""" def __init__(self, n=5, h=10, b=100000, l1=.0, l2=.0, random_state=None): """Initialize the NN class object. Args: h (int): number of h nodes b (int): number of input examples to be processed together to find the second order gradient for back-propagation n (int): number of epoches l1 (float): regularization parameter for weights between the input and hidden layers l2 (float): regularization parameter for weights between the hidden and output layers. """ np.random.seed(random_state) self.h = h self.b = b self.n = n self.l1 = l1 self.l2 = l2 self.n_opt = 0
[docs] def fit(self, X, y, X_val=None, y_val=None): """Train a network with the quasi-Newton method. Args: X (np.array of float): feature matrix for training y (np.array of float): target values for training X_val (np.array of float): feature matrix for validation y_val (np.array of float): target values for validation """ y = y.reshape((len(y), 1)) if sparse.issparse(X): X = X.tocsr() if X_val is not None: n_val = len(y_val) y_val = y_val.reshape((n_val, 1)) # Set initial weights randomly. self.i = X.shape[1] self.l1 = self.l1 / self.i self.w = (np.random.rand((self.i + 2) * self.h + 1) - .5) * 1e-6 self.w_opt = self.w self.n_opt = 0 logger.info('training ...') n_obs = X.shape[0] batch = self.b n_epoch = self.n idx = range(n_obs) self.auc_opt = .5 start = time.time() print('\tEPOCH TRAIN VALID BEST TIME (m)') print('\t--------------------------------------------') # Before training p = self.predict_raw(X) auc = roc_auc_score(y, p) auc_val = auc if X_val is not None: p_val = self.predict_raw(X_val) auc_val = roc_auc_score(y_val, p_val) print('\t{:3d}: {:.6f} {:.6f} {:.6f} {:.2f}'.format( 0, auc, auc_val, self.auc_opt, (time.time() - start) / SEC_PER_MIN)) # Use 'while' instead of 'for' to increase n_epoch if the validation # error keeps improving at the end of n_epoch epoch = 1 while epoch <= n_epoch: # Shuffle inputs every epoch - it helps avoiding the local optimum # when batch < n_obs. np.random.shuffle(idx) # Find the optimal weights for batch input examples. # If batch == 1, it's the stochastic optimization, which is slow # but uses minimal memory. If batch == n_obs, it's the batch # optimization, which is fast but uses maximum memory. # Otherwise, it's the mini-batch optimization, which balances the # speed and space trade-offs. for i in range(int(n_obs / batch) + 1): if (i + 1) * batch > n_obs: sub_idx = idx[batch * i:n_obs] else: sub_idx = idx[batch * i:batch * (i + 1)] x = X[sub_idx] neg_idx = [n_idx for n_idx, n_y in enumerate(y[sub_idx]) if n_y == 0.] pos_idx = [p_idx for p_idx, p_y in enumerate(y[sub_idx]) if p_y == 1.] x0 = x[neg_idx] x1 = x[pos_idx] # Update weights to minimize the cost function using the # quasi-Newton method (L-BFGS-B), where: # func -- cost function # jac -- jacobian (derivative of the cost function) # maxiter -- number of iterations for L-BFGS-B ret = minimize(self.func, self.w, args=(x0, x1), method='L-BFGS-B', jac=self.fprime, options={'maxiter': 5}) self.w = ret.x p = self.predict_raw(X) auc = roc_auc_score(y, p) auc_val = auc if X_val is not None: p_val = self.predict_raw(X_val) auc_val = roc_auc_score(y_val, p_val) if auc_val > self.auc_opt: self.auc_opt = auc_val self.w_opt = self.w self.n_opt = epoch # If validation auc is still improving after n_epoch, # try 10 more epochs if epoch == n_epoch: n_epoch += 5 print('\t{:3d}: {:.6f} {:.6f} {:.6f} {:.2f}'.format( epoch, auc, auc_val, self.auc_opt, (time.time() - start) / SEC_PER_MIN)) epoch += 1 if X_val is not None: print('Optimal epoch is {0} ({1:.6f})'.format(self.n_opt, self.auc_opt)) self.w = self.w_opt logger.info('done training')
[docs] def predict(self, X): """Predict targets for a feature matrix. Args: X (np.array of float): feature matrix for prediction Returns: prediction (np.array) """ logger.info('predicting ...') ps = self.predict_raw(X) return sigm(ps[:, 0])
[docs] def predict_raw(self, X): """Predict targets for a feature matrix. Args: X (np.array of float): feature matrix for prediction """ # b -- bias for the input and h layers b = np.ones((X.shape[0], 1)) w2 = self.w[-(self.h + 1):].reshape(self.h + 1, 1) w1 = self.w[:-(self.h + 1)].reshape(self.i + 1, self.h) # Make X to have the same number of columns as self.i. # Because of the sparse matrix representation, X for prediction can # have a different number of columns. if X.shape[1] > self.i: # If X has more columns, cut extra columns. X = X[:, :self.i] elif X.shape[1] < self.i: # If X has less columns, cut the rows of the weight matrix between # the input and h layers instead of X itself because the SciPy # sparse matrix does not support .set_shape() yet. idx = range(X.shape[1]) idx.append(self.i) # Include the last row for the bias w1 = w1[idx, :] if sparse.issparse(X): return np.hstack((sigm(sparse.hstack((X, b)).dot(w1)), b)).dot(w2) else: return np.hstack((sigm(np.hstack((X, b)).dot(w1)), b)).dot(w2)
[docs] def func(self, w, *args): """Return the costs of the neural network for predictions. Args: w (array of float): weight vectors such that: w[:-h1] -- weights between the input and h layers w[-h1:] -- weights between the h and output layers args: features (args[0]) and target (args[1]) Returns: combined cost of RMSE, L1, and L2 regularization """ x0 = args[0] x1 = args[1] n0 = x0.shape[0] n1 = x1.shape[0] # n -- number of pairs to evaluate n = max(n0, n1) * 10 idx0 = np.random.choice(range(n0), size=n) idx1 = np.random.choice(range(n1), size=n) # b -- bias for the input and h layers b0 = np.ones((n0, 1)) b1 = np.ones((n1, 1)) i1 = self.i + 1 h = self.h h1 = h + 1 # Predict for features -- cannot use predict_raw() because here # different weights can be used. if sparse.issparse(x0): p0 = np.hstack((sigm(sparse.hstack((x0, b0)).dot(w[:-h1].reshape( i1, h))), b0)).dot(w[-h1:].reshape(h1, 1)) p1 = np.hstack((sigm(sparse.hstack((x1, b1)).dot(w[:-h1].reshape( i1, h))), b1)).dot(w[-h1:].reshape(h1, 1)) else: p0 = np.hstack((sigm(np.hstack((x0, b0)).dot(w[:-h1].reshape( i1, h))), b0)).dot(w[-h1:].reshape(h1, 1)) p1 = np.hstack((sigm(np.hstack((x1, b1)).dot(w[:-h1].reshape( i1, h))), b1)).dot(w[-h1:].reshape(h1, 1)) p0 = p0[idx0] p1 = p1[idx1] # Return the cost that consists of the sum of squared error + # L2-regularization for weights between the input and h layers + # L2-regularization for weights between the h and output layers. return .5 * (sum((1 - p1 + p0) ** 2) / n + self.l1 * sum(w[:-h1] ** 2) / (i1 * h) + self.l2 * sum(w[-h1:] ** 2) / h1)
[docs] def fprime(self, w, *args): """Return the derivatives of the cost function for predictions. Args: w (array of float): weight vectors such that: w[:-h1] -- weights between the input and h layers w[-h1:] -- weights between the h and output layers args: features (args[0]) and target (args[1]) Returns: gradients of the cost function for predictions """ x0 = args[0] x1 = args[1] n0 = x0.shape[0] n1 = x1.shape[0] # n -- number of pairs to evaluate n = max(n0, n1) * 10 idx0 = np.random.choice(range(n0), size=n) idx1 = np.random.choice(range(n1), size=n) # b -- bias for the input and h layers b = np.ones((n, 1)) i1 = self.i + 1 h = self.h h1 = h + 1 w2 = w[-h1:].reshape(h1, 1) w1 = w[:-h1].reshape(i1, h) if sparse.issparse(x0): x0 = x0.tocsr()[idx0] x1 = x1.tocsr()[idx1] xb0 = sparse.hstack((x0, b)) xb1 = sparse.hstack((x1, b)) else: x0 = x0[idx0] x1 = x1[idx1] xb0 = np.hstack((x0, b)) xb1 = np.hstack((x1, b)) z0 = np.hstack((sigm(xb0.dot(w1)), b)) z1 = np.hstack((sigm(xb1.dot(w1)), b)) y0 = z0.dot(w2) y1 = z1.dot(w2) e = 1 - (y1 - y0) dy = e / n # Calculate the derivative of the cost function w.r.t. F and w2 where: # F -- weights between the input and h layers # w2 -- weights between the h and output layers dw1 = -(xb1.T.dot(dy.dot(w2[:-1].reshape(1, h)) * dsigm(xb1.dot(w1))) - xb0.T.dot(dy.dot(w2[:-1].reshape(1, h)) * dsigm(xb0.dot(w1))) ).reshape(i1 * h) + self.l1 * w[:-h1] / (i1 * h) dw2 = -(z1 - z0).T.dot(dy).reshape(h1) + self.l2 * w[-h1:] / h1 return np.append(dw1, dw2)
def sigm(x): """Return the value of the sigmoid function at x. Args: x (np.array of float or float) Returns: value(s) of the sigmoid function for x. """ # Avoid numerical overflow by capping the input to the exponential # function - doesn't affect the return value. return 1 / (1 + np.exp(-np.maximum(x, -20))) def dsigm(x): """Return the value of derivative of sigmoid function w.r.t. x. Args: x (np.array of float or float) Returns: derivative(s) of the sigmoid function w.r.t. x. """ return sigm(x) * (1 - sigm(x))