Source code for spotlight.factorization.implicit

"""
Factorization models for implicit feedback problems.
"""

import numpy as np

import torch

import torch.optim as optim

from spotlight.helpers import _repr_model
from spotlight.factorization._components import _predict_process_ids
from spotlight.losses import (adaptive_hinge_loss,
                              bpr_loss,
                              hinge_loss,
                              pointwise_loss)
from spotlight.factorization.representations import BilinearNet
from spotlight.sampling import sample_items
from spotlight.torch_utils import cpu, gpu, minibatch, set_seed, shuffle


[docs]class ImplicitFactorizationModel(object): """ An implicit feedback matrix factorization model. Uses a classic matrix factorization [1]_ approach, with latent vectors used to represent both users and items. Their dot product gives the predicted score for a user-item pair. The latent representation is given by :class:`spotlight.factorization.representations.BilinearNet`. The model is trained through negative sampling: for any known user-item pair, one or more items are randomly sampled to act as negatives (expressing a lack of preference by the user for the sampled item). .. [1] Koren, Yehuda, Robert Bell, and Chris Volinsky. "Matrix factorization techniques for recommender systems." Computer 42.8 (2009). Parameters ---------- loss: string, optional One of 'pointwise', 'bpr', 'hinge', or 'adaptive hinge', corresponding to losses from :class:`spotlight.losses`. embedding_dim: int, optional Number of embedding dimensions to use for users and items. n_iter: int, optional Number of iterations to run. batch_size: int, optional Minibatch size. l2: float, optional L2 loss penalty. learning_rate: float, optional Initial learning rate. optimizer_func: function, optional Function that takes in module parameters as the first argument and returns an instance of a PyTorch optimizer. Overrides l2 and learning rate if supplied. If no optimizer supplied, then use ADAM by default. use_cuda: boolean, optional Run the model on a GPU. representation: a representation module, optional If supplied, will override default settings and be used as the main network module in the model. Intended to be used as an escape hatch when you want to reuse the model's training functions but want full freedom to specify your network topology. sparse: boolean, optional Use sparse gradients for embedding layers. random_state: instance of numpy.random.RandomState, optional Random state to use when fitting. num_negative_samples: int, optional Number of negative samples to generate for adaptive hinge loss. """ def __init__(self, loss='pointwise', embedding_dim=32, n_iter=10, batch_size=256, l2=0.0, learning_rate=1e-2, optimizer_func=None, use_cuda=False, representation=None, sparse=False, random_state=None, num_negative_samples=5): assert loss in ('pointwise', 'bpr', 'hinge', 'adaptive_hinge') self._loss = loss self._embedding_dim = embedding_dim self._n_iter = n_iter self._learning_rate = learning_rate self._batch_size = batch_size self._l2 = l2 self._use_cuda = use_cuda self._representation = representation self._sparse = sparse self._optimizer_func = optimizer_func self._random_state = random_state or np.random.RandomState() self._num_negative_samples = num_negative_samples self._num_users = None self._num_items = None self._net = None self._optimizer = None self._loss_func = None set_seed(self._random_state.randint(-10**8, 10**8), cuda=self._use_cuda) def __repr__(self): return _repr_model(self) @property def _initialized(self): return self._net is not None def _initialize(self, interactions): (self._num_users, self._num_items) = (interactions.num_users, interactions.num_items) if self._representation is not None: self._net = gpu(self._representation, self._use_cuda) else: self._net = gpu( BilinearNet(self._num_users, self._num_items, self._embedding_dim, sparse=self._sparse), self._use_cuda ) if self._optimizer_func is None: self._optimizer = optim.Adam( self._net.parameters(), weight_decay=self._l2, lr=self._learning_rate ) else: self._optimizer = self._optimizer_func(self._net.parameters()) if self._loss == 'pointwise': self._loss_func = pointwise_loss elif self._loss == 'bpr': self._loss_func = bpr_loss elif self._loss == 'hinge': self._loss_func = hinge_loss else: self._loss_func = adaptive_hinge_loss def _check_input(self, user_ids, item_ids, allow_items_none=False): if isinstance(user_ids, int): user_id_max = user_ids else: user_id_max = user_ids.max() if user_id_max >= self._num_users: raise ValueError('Maximum user id greater ' 'than number of users in model.') if allow_items_none and item_ids is None: return if isinstance(item_ids, int): item_id_max = item_ids else: item_id_max = item_ids.max() if item_id_max >= self._num_items: raise ValueError('Maximum item id greater ' 'than number of items in model.')
[docs] def fit(self, interactions, verbose=False): """ Fit the model. When called repeatedly, model fitting will resume from the point at which training stopped in the previous fit call. Parameters ---------- interactions: :class:`spotlight.interactions.Interactions` The input dataset. verbose: bool Output additional information about current epoch and loss. """ user_ids = interactions.user_ids.astype(np.int64) item_ids = interactions.item_ids.astype(np.int64) if not self._initialized: self._initialize(interactions) self._check_input(user_ids, item_ids) for epoch_num in range(self._n_iter): users, items = shuffle(user_ids, item_ids, random_state=self._random_state) user_ids_tensor = gpu(torch.from_numpy(users), self._use_cuda) item_ids_tensor = gpu(torch.from_numpy(items), self._use_cuda) epoch_loss = 0.0 for (minibatch_num, (batch_user, batch_item)) in enumerate(minibatch(user_ids_tensor, item_ids_tensor, batch_size=self._batch_size)): positive_prediction = self._net(batch_user, batch_item) if self._loss == 'adaptive_hinge': negative_prediction = self._get_multiple_negative_predictions( batch_user, n=self._num_negative_samples) else: negative_prediction = self._get_negative_prediction(batch_user) self._optimizer.zero_grad() loss = self._loss_func(positive_prediction, negative_prediction) epoch_loss += loss.item() loss.backward() self._optimizer.step() epoch_loss /= minibatch_num + 1 if verbose: print('Epoch {}: loss {}'.format(epoch_num, epoch_loss)) if np.isnan(epoch_loss) or epoch_loss == 0.0: raise ValueError('Degenerate epoch loss: {}' .format(epoch_loss))
def _get_negative_prediction(self, user_ids): negative_items = sample_items( self._num_items, len(user_ids), random_state=self._random_state) negative_var = gpu(torch.from_numpy(negative_items), self._use_cuda) negative_prediction = self._net(user_ids, negative_var) return negative_prediction def _get_multiple_negative_predictions(self, user_ids, n=5): batch_size = user_ids.size(0) negative_prediction = self._get_negative_prediction(user_ids .view(batch_size, 1) .expand(batch_size, n) .reshape(batch_size * n)) return negative_prediction.view(n, len(user_ids))
[docs] def predict(self, user_ids, item_ids=None): """ Make predictions: given a user id, compute the recommendation scores for items. Parameters ---------- user_ids: int or array If int, will predict the recommendation scores for this user for all items in item_ids. If an array, will predict scores for all (user, item) pairs defined by user_ids and item_ids. item_ids: array, optional Array containing the item ids for which prediction scores are desired. If not supplied, predictions for all items will be computed. Returns ------- predictions: np.array Predicted scores for all items in item_ids. """ self._check_input(user_ids, item_ids, allow_items_none=True) self._net.train(False) user_ids, item_ids = _predict_process_ids(user_ids, item_ids, self._num_items, self._use_cuda) out = self._net(user_ids, item_ids) return cpu(out).detach().numpy().flatten()