Source code for karateclub.node_embedding.attributed.tadw

import numpy as np
import networkx as nx
from typing import Union
from scipy.sparse import coo_matrix
from karateclub.estimator import Estimator
from sklearn.decomposition import TruncatedSVD


[docs]class TADW(Estimator): r"""An implementation of `"TADW" <https://www.ijcai.org/Proceedings/15/Papers/299.pdf>`_ from the IJCAI '15 paper "Network Representation Learning with Rich Text Information". The procedure uses the node attribute matrix with a factorization matrix to reproduce a power of the adjacency matrix to create representations. Args: dimensions (int): Number of embedding dimensions. Default is 32. reduction_dimensions (int): SVD reduction dimensions. Default is 64. svd_iterations (int): SVD iteration count. Default is 20. seed (int): Random seed. Default is 42. alpha (float): Learning rate. Default is 0.01. iterations (int): Matrix decomposition iterations. Default is 10. lambd (float): Regularization coefficient. Default is 10.0. """ def __init__( self, dimensions: int = 32, reduction_dimensions: int = 64, svd_iterations: int = 20, seed: int = 42, alpha: float = 0.01, iterations: int = 10, lambd: float = 10.0, ): self.dimensions = dimensions self.reduction_dimensions = reduction_dimensions self.svd_iterations = svd_iterations self.seed = seed self.alpha = alpha self.iterations = iterations self.lambd = lambd self.seed = seed def _create_target_matrix(self, graph): """ Creating a normalized sparse adjacency matrix power target. Arg types: * **graph** *(NetworkX graph)* - The graph to be embedded. Return types: * **A_tilde** *(Scipy COO matrix) - The target matrix. """ weighted_graph = nx.Graph() for (u, v) in graph.edges(): weighted_graph.add_edge(u, v, weight=1.0 / graph.degree(u)) weighted_graph.add_edge(v, u, weight=1.0 / graph.degree(v)) A_hat = nx.adjacency_matrix( weighted_graph, nodelist=range(graph.number_of_nodes()) ) A_tilde = A_hat.dot(A_hat) return coo_matrix(A_tilde) def _init_weights(self): """ Initialization of weights and loss container. """ self._W = np.random.uniform(0, 1, (self.dimensions, self._A.shape[0])) self._H = np.random.uniform(0, 1, (self.dimensions, self._T.shape[0])) def _update_W(self): """ A single update of the node embedding matrix. """ penalty = (self.lambd / np.linalg.norm(self._W)) * self._W transformed_features = self._H.dot(self._T) scores = 0 for i in range(self.dimensions): scores = ( scores + transformed_features[i, self._A.row] * self._W[i, self._A.col] ) score_matrix = coo_matrix( (scores, (self._A.row, self._A.col)), shape=self._A.shape ) diff_matrix = self._A - score_matrix main_grad = diff_matrix.dot(transformed_features.T).T / np.sum( np.square(scores) ) grad = penalty - main_grad self._W = self._W - self.alpha * grad def _update_H(self): """ A single update of the feature basis matrix. """ penalty = (self.lambd / np.linalg.norm(self._H)) * self._H transformed_features = self._H.dot(self._T) scores = 0 for i in range(self.dimensions): scores = ( scores + transformed_features[i, self._A.col] * self._W[i, self._A.row] ) score_matrix = coo_matrix( (scores, (self._A.row, self._A.col)), shape=self._A.shape ) diff_matrix = self._A - score_matrix main_grad = self._W.dot(diff_matrix.dot(self._T.T)) / np.sum(np.square(scores)) grad = penalty - main_grad self._H = self._H - self.alpha * grad def _create_reduced_features(self, X): """ Creating a dense reduced node feature matrix. Arg types: * **X** *(Scipy COO or Numpy array)* - The wide feature matrix. Return types: * **T** *(Numpy array)* - The reduced feature matrix of nodes. """ svd = TruncatedSVD( n_components=self.reduction_dimensions, n_iter=self.svd_iterations, random_state=self.seed, ) svd.fit(X) T = svd.transform(X) return T.T
[docs] def fit(self, graph: nx.classes.graph.Graph, X: Union[np.array, coo_matrix]): """ Fitting a TADW model. Arg types: * **graph** *(NetworkX graph)* - The graph to be embedded. * **X** *(Scipy COO or Numpy array)* - The matrix of node features. """ self._set_seed() graph = self._check_graph(graph) self._A = self._create_target_matrix(graph) self._T = self._create_reduced_features(X) self._init_weights() for _ in range(self.iterations): self._update_W() self._update_H()
[docs] def get_embedding(self) -> np.array: r"""Getting the node embedding. Return types: * **embedding** *(Numpy array)* - The embedding of nodes. """ embedding = np.concatenate( [np.transpose(self._W), np.transpose(np.dot(self._H, self._T))], axis=1 ) return embedding