Source code for karateclub.community_detection.overlapping.nnsed

import numpy as np
import networkx as nx
from typing import Dict
from scipy import sparse
from karateclub.estimator import Estimator


[docs]class NNSED(Estimator): r"""An implementation of `"NNSED" <http://www.bigdatalab.ac.cn/~shenhuawei/publications/2017/cikm-sun.pdf>`_ from the CIKM '17 paper "A Non-negative Symmetric Encoder-Decoder Approach for Community Detection". The procedure uses non-negative matrix factorization in order to learn an unnormalized cluster membership distribution over nodes. The method can be used in an overlapping and non-overlapping way. Args: layers (int): Embedding layer size. Default is 32. iterations (int): Number of training epochs. Default 10. seed (int): Random seed for weight initializations. Default 42. noise (float): Random noise for normalization stability. Default is 10**-6. """ def __init__( self, dimensions: int = 32, iterations: int = 10, seed: int = 42, noise: float = 10 ** -6, ): self.dimensions = dimensions self.iterations = iterations self.seed = seed self.noise = noise def _create_D_inverse(self, graph): """ Creating a sparse inverse degree matrix. Arg types: * **graph** *(NetworkX graph)* - The graph to be embedded. Return types: * **D_inverse** *(Scipy array)* - Diagonal inverse degree matrix. """ index = np.arange(graph.number_of_nodes()) values = np.array( [1.0 / graph.degree[node] for node in range(graph.number_of_nodes())] ) shape = (graph.number_of_nodes(), graph.number_of_nodes()) D_inverse = sparse.coo_matrix((values, (index, index)), shape=shape) return D_inverse def _create_base_matrix(self, graph): """ Creating a tuple with the normalized adjacency matrix. Return types: * **A_hat** *Scipy array* - Normalized adjacency. """ A = nx.adjacency_matrix(graph, nodelist=range(graph.number_of_nodes())) D_inverse = self._create_D_inverse(graph) A_hat = D_inverse.dot(A) return A_hat def _setup_embeddings(self, graph): """ Setup the node embedding matrices. Arg types: * **graph** *(NetworkX graph)* - The graph to be clustered. """ number_of_nodes = graph.shape[0] self._W = np.abs( np.random.normal(0, 10, size=(number_of_nodes, self.dimensions)) ) self._Z = np.abs( np.random.normal(0, 10, size=(self.dimensions, number_of_nodes)) ) def _update_W(self, A): """ Updating the vertical basis matrix. Arg types: * **A** *(Scipy COO matrix)* - The normalized adjacency matrix. """ enum = A.dot(self._Z.T) denom_1 = self._W.dot(self._Z).dot(self._Z.T) denom_2 = (A.dot(A.transpose())).dot(self._W) denom = denom_1 + denom_2 + self.noise self._W = self._W * (enum / denom) def _update_Z(self, A): """ Updating the horizontal basis matrix. Arg types: * **A** *(Scipy COO matrix)* - The normalized adjacency matrix. """ enum = (A.dot(self._W)).transpose() denom = np.dot(np.dot(self._W.T, self._W), self._Z) + self._Z + self.noise self._Z = self._Z * (enum / denom)
[docs] def get_embedding(self) -> np.array: r"""Getting the bottleneck layer embedding. Return types: * **embedding** *(Numpy array)* - The embedding of nodes. """ embedding = self._Z.T return embedding
[docs] def get_memberships(self) -> Dict[int, int]: r"""Getting the cluster membership of nodes. Return types: * **memberships** *(dict)* - Node cluster memberships. """ index = np.argmax(self._Z, axis=0) memberships = {int(i): int(index[i]) for i in range(len(index))} return memberships
[docs] def fit(self, graph: nx.classes.graph.Graph): """ Fitting an NNSED clustering model. Arg types: * **graph** *(NetworkX graph)* - The graph to be clustered. """ self._set_seed() graph = self._check_graph(graph) A = self._create_base_matrix(graph) self._setup_embeddings(A) for _ in range(self.iterations): self._update_W(A) self._update_Z(A)