import random
import numpy as np
import networkx as nx
from typing import Dict
from karateclub.estimator import Estimator
from karateclub.utils.walker import RandomWalker

[docs]class GEMSEC(Estimator): r"""An implementation of `"GEMSEC" <>`_ from the ASONAM '19 paper "GEMSEC: Graph Embedding with Self Clustering". The procedure uses random walks to approximate the pointwise mutual information matrix obtained by pooling normalized adjacency matrix powers. This matrix is decomposed by an approximate factorization technique which is combined with a k-means like clustering cost. A node embedding and clustering are learned jointly. Args: walk_number (int): Number of random walks. Default is 5. walk_length (int): Length of random walks. Default is 80. dimensions (int): Dimensionality of embedding. Default is 32. negative_samples (int): Number of negative samples. Default is 5. window_size (int): Matrix power order. Default is 5. learning_rate (float): Gradient descent learning rate. Default is 0.1. clusters (int): Number of cluster centers. Default is 10. gamma (float): Clustering cost weight coefficient. Default is 0.1. seed (int): Random seed value. Default is 42. """ def __init__( self, walk_number: int = 5, walk_length: int = 80, dimensions: int = 32, negative_samples: int = 5, window_size: int = 5, learning_rate: float = 0.1, clusters: int = 10, gamma: float = 0.1, seed: int = 42, ): self.walk_number = walk_number self.walk_length = walk_length self.dimensions = dimensions self.negative_samples = negative_samples self.window_size = window_size self.learning_rate = learning_rate self.clusters = clusters self.gamma = gamma self.seed = seed def _setup_sampling_weights(self, graph): """ Creating a negative sampling table. Arg types: * **graph** *(NetworkX graph)* - The graph for negative sampling. """ self._sampler = {} index = 0 for node in graph.nodes(): for _ in range( self._sampler[index] = node index = index + 1 self._global_index = index - 1 def _initialize_node_embeddings(self, graph): """ Creating a node embedding array. Arg types: * **graph** *(NetworkX graph)* - The graph for negative sampling. """ shape = (graph.number_of_nodes(), self.dimensions) self._base_embedding = np.random.normal(0, 1.0 / self.dimensions, shape) def _initialize_cluster_centers(self, graph): """ Creating a cluster center array. Arg types: * **graph** *(NetworkX graph)* - The graph for negative sampling. """ shape = (self.dimensions, self.clusters) self._cluster_centers = np.random.normal(0, 1.0 / self.dimensions, shape) def _sample_negative_samples(self): """ Sampling a batch of nodes as negative samples. Return types: * **negative_samples** *(list)*: List of negative sampled nodes. """ negative_samples = [ self._sampler[random.randint(0, self._global_index)] for _ in range(self.negative_samples) ] return negative_samples def _calculcate_noise_vector(self, negative_samples, source_node): """ Getting the noise vector for the weight update. Arg types: * **negative_samples** *(list)*: List of negative sampled nodes. * **source_node** *(int)* - Source node in the walk. Return types: * **noise_vector** *(NumPy array) - Noise update vector. """ noise_vectors = self._base_embedding[negative_samples, :] source_vector = self._base_embedding[int(source_node), :] raw_scores = raw_scores = np.exp(np.clip(raw_scores, -15, 15)) scores = raw_scores / np.sum(raw_scores) scores = scores.reshape(-1, 1) noise_vector = np.sum(scores * noise_vectors, axis=0) return noise_vector def _calculate_cluster_vector(self, source_node): """ Getting the cluster vector for the weight update. Arg types: * **source_node** *(int)* - Source node in the walk. Return types: * **cluster_vector** *(NumPy array) - Cluster update vector. * **cluster_index** *(int)*: Node cluster membership index. """ distances = ( self._base_embedding[int(source_node), :].reshape(-1, 1) - self._cluster_centers ) scores = np.power(np.sum(np.power(distances, 2), axis=0), 0.5) cluster_index = np.argmin(scores) cluster_vector = distances[:, cluster_index] / scores[cluster_index] return cluster_vector, cluster_index def _do_descent_for_pair(self, negative_samples, source_node, target_node): """ Updating the cluster center and the node embedding. Arg types: * **negative_samples** *(list)* - Negative samples. * **source_node** *(int)* - Source node in the walk. * **target_node** *(int)* - Target node in the walk. """ noise_vector = self._calculcate_noise_vector(negative_samples, source_node) target_vector = self._base_embedding[int(target_node), :] cluster_vector, cluster_index = self._calculate_cluster_vector(source_node) node_gradient = noise_vector - target_vector + self.gamma * cluster_vector node_gradient = node_gradient / np.linalg.norm(node_gradient) self._base_embedding[int(source_node), :] += -self.learning_rate * node_gradient self._cluster_centers[:, cluster_index] += ( self.learning_rate * self.gamma * cluster_vector ) def _update_a_weight(self, source_node, target_node): """ Updating the weights for a pair of nodes. Arg types: * **source_node** *(int)* - Source node in the walk. * **target_node** *(int)* - Target node in the walk. """ negative_samples = self._sample_negative_samples() self._do_descent_for_pair(negative_samples, source_node, target_node) self._do_descent_for_pair(negative_samples, target_node, source_node) def _do_gradient_descent(self): """ Updating the embedding weights and cluster centers with gradient descent. """ random.shuffle(self._walker.walks) for walk in self._walker.walks: for i, source_node in enumerate( walk[: self.walk_length - self.window_size] ): for step in range(1, self.window_size + 1): target_node = walk[i + step] self._update_a_weight(source_node, target_node)
[docs] def fit(self, graph: nx.classes.graph.Graph): """ Fitting a GEMSEC model. Arg types: * **graph** *(NetworkX graph)* - The graph to be embedded. """ self._set_seed() graph = self._check_graph(graph) self._setup_sampling_weights(graph) self._walker = RandomWalker(self.walk_length, self.walk_number) self._walker.do_walks(graph) self._initialize_node_embeddings(graph) self._initialize_cluster_centers(graph) self._do_gradient_descent()
[docs] def get_embedding(self) -> np.array: r"""Getting the node embedding. Return types: * **embedding** *(Numpy array)*: The embedding of nodes. """ return np.array(self._base_embedding)
def _get_membership(self, node): """Getting the cluster membership of a node. Arg types: * **node** *(int)* - The graph to be clustered. Return types: * **cluster_index** *(int)*: Node cluster membership index. """ distances = self._base_embedding[node, :].reshape(-1, 1) - self._cluster_centers scores = np.power(np.sum(np.power(distances, 2), axis=0), 0.5) cluster_index = np.argmin(scores) return cluster_index
[docs] def get_memberships(self) -> Dict[int, int]: r"""Getting the cluster membership of nodes. Return types: * **memberships** *(dict)*: Node cluster memberships. """ memberships = { node: self._get_membership(node) for node in range(self._base_embedding.shape[0]) } return memberships