Source code for karateclub.community_detection.non_overlapping.edmot

import community
import networkx as nx
from typing import Dict
from karateclub.estimator import Estimator


[docs]class EdMot(Estimator): r"""An implementation of `"Edge Motif Clustering" <https://arxiv.org/abs/1906.04560>`_ from the KDD '19 paper "EdMot: An Edge Enhancement Approach for Motif-aware Community Detection". The tool first creates the graph of higher order motifs. This graph is clustered by the Louvain method. The resulting cluster memberships are stored as a dictionary. Args: component_count (int): Number of extracted motif hypergraph components. Default is 2. cutoff (int): Motif edge cut-off value. Default is 50. seed (int): Random seed value. Default is 42. """ def __init__(self, component_count: int = 2, cutoff: int = 50, seed: int = 42): self.component_count = component_count self.cutoff = cutoff self.seed = seed def _overlap(self, node_1, node_2): """ Calculating the neighbourhood overlap for a pair of nodes. Arg types: * **node_1** *(int)* - Source node 1. * **node_2** *(int)* - Source node 2. Return types: * **overlap** *(int)* - Neighbourhood overlap score. """ nodes_1 = self._graph.neighbors(node_1) nodes_2 = self._graph.neighbors(node_2) overlap = len(set(nodes_1).intersection(set(nodes_2))) return overlap def _calculate_motifs(self): """ Enumerating pairwise motif counts. """ edges = [ e for e in self._graph.edges() if self._overlap(e[0], e[1]) >= self.cutoff ] self._motif_graph = nx.from_edgelist(edges) def _extract_components(self): """ Extracting connected components from motif graph. """ components = [c for c in nx.connected_components(self._motif_graph)] components = [[len(c), c] for c in components] components.sort(key=lambda x: x[0], reverse=True) important_components = [ components[comp][1] for comp in range( self.component_count if len(components) >= self.component_count else len(components) ) ] self._blocks = [list(graph) for graph in important_components] def _fill_blocks(self): """ Filling the dense blocks of the adjacency matrix. """ new_edges = [ (n_1, n_2) for nodes in self._blocks for n_1 in nodes for n_2 in nodes if n_1 != n_2 ] self._graph.add_edges_from(new_edges)
[docs] def fit(self, graph: nx.classes.graph.Graph): """ Fitting an Edge Motif clustering model. Arg types: * **graph** *(NetworkX graph)* - The graph to be clustered. """ self._set_seed() graph = self._check_graph(graph) self._graph = graph self._calculate_motifs() self._extract_components() self._fill_blocks() self._partition = community.best_partition(self._graph, random_state=self.seed)
[docs] def get_memberships(self) -> Dict[int, int]: r"""Getting the cluster membership of nodes. Return types: * **memberships** *(dictionary of ints)* - Cluster memberships. """ return self._partition