# Source code for karateclub.graph_embedding.feathergraph

```import math
from functools import partial
from typing import List, Callable

import numpy as np
import networkx as nx
import scipy.sparse as sparse

from karateclub.estimator import Estimator

def _weighted_directed_degree(node: int, graph: nx.classes.graph.Graph) -> float:
out = graph.degree(node, weight="weight")

return out

def _unweighted_undirected_degree(node: int, graph: nx.classes.graph.Graph) -> float:
out = graph.degree[node]

return float(out)

def _get_degree_fn(graph) -> Callable:
"""Gets the function to calculate the graph node degree"""
fn = (
_weighted_directed_degree
if nx.classes.function.is_weighted(graph)
else _unweighted_undirected_degree
)
fn = partial(fn, graph=graph)

return fn

[docs]class FeatherGraph(Estimator):
r"""An implementation of `"FEATHER-G" <https://arxiv.org/abs/2005.07959>`_
from the CIKM '20 paper "Characteristic Functions on Graphs: Birds of a Feather,
from Statistical Descriptors to Parametric Models". The procedure
uses characteristic functions of node features with random walk weights to describe
node neighborhoods. These node level features are pooled by mean pooling to
create graph level statistics.

Args:
order (int): Adjacency matrix powers. Default is 5.
eval_points (int): Number of evaluation points. Default is 25.
theta_max (int): Maximal evaluation point value. Default is 2.5.
seed (int): Random seed value. Default is 42.
pooling (str): Permutation invariant pooling function, one of:
(:obj:`"mean"`, :obj:`"max"`, :obj:`"min"`). Default is "mean."
"""
n_nodes: int
degree_fn: Callable
_embedding: List[np.ndarray]

def __init__(
self,
order: int = 5,
eval_points: int = 25,
theta_max: float = 2.5,
seed: int = 42,
pooling: str = "mean",
):
super(FeatherGraph, self).__init__()

self.order = order
self.eval_points = eval_points
self.theta_max = theta_max
self.seed = seed

try:
pool_fn = getattr(np, pooling)
except AttributeError:
raise ValueError(f"{pooling.__repr__()} is not a valid pooling function")

self.pooling = pooling
self.pool_fn = partial(pool_fn, axis=0)

def _create_d_inverse(self) -> sparse.coo_matrix:
"""
Creating a sparse inverse degree matrix.

Arg types:
* **graph** *(NetworkX graph)* - The graph to be embedded.

Return types:
* **D_inverse** *(Scipy array)* - Diagonal inverse degree matrix.
"""
index = np.arange(self.n_nodes)
values = np.array(
[1.0 / self.degree_fn(node) for node in range(self.n_nodes)]
)  # <- ?

shape = (self.n_nodes, self.n_nodes)
D_inverse = sparse.coo_matrix((values, (index, index)), shape=shape)
return D_inverse

self, graph: nx.classes.graph.Graph
) -> sparse.coo_matrix:
"""

Arg types:
* **graph** *(NetworkX graph)* - The graph of interest.

Return types:
* **A_hat** *(SciPy array)* - The scattering matrix of the graph.
"""
D_inverse = self._create_d_inverse()

A_hat = D_inverse.dot(A)
return A_hat

def _create_node_feature_matrix(self, graph: nx.classes.graph.Graph) -> np.ndarray:
"""
Calculating the node features.

Arg types:
* **graph** *(NetworkX graph)* - The graph of interest.

Return types:
* **X** *(NumPy array)* - The node features.
"""
log_degree = np.array(
[math.log(self.degree_fn(node) + 1) for node in range(self.n_nodes)]
)
log_degree = log_degree.reshape(-1, 1)

clustering_coefficient = np.array(
[nx.clustering(graph, node) for node in range(self.n_nodes)]
)
clustering_coefficient = clustering_coefficient.reshape(-1, 1)

X = np.concatenate([log_degree, clustering_coefficient], axis=1)
return X

def _calculate_feather(self, graph: nx.classes.graph.Graph) -> np.ndarray:
"""
Calculating the characteristic function features of a graph.

Arg types:
* **graph** *(NetworkX graph)* - A graph to be embedded.

Return types:
* **features** *(Numpy vector)* - The embedding of a single graph.
"""
self.n_nodes = graph.number_of_nodes()
self.degree_fn = _get_degree_fn(graph)

X = self._create_node_feature_matrix(graph)
theta = np.linspace(0.01, self.theta_max, self.eval_points)

X = np.outer(X, theta)
X = X.reshape(graph.number_of_nodes(), -1)
X = np.concatenate([np.cos(X), np.sin(X)], axis=1)

feature_blocks = []
for _ in range(self.order):
X = A_tilde.dot(X)
feature_blocks.append(X)

feature_blocks = np.concatenate(feature_blocks, axis=1)
feature_blocks = self.pool_fn(feature_blocks)

return feature_blocks

[docs]    def fit(self, graphs: List[nx.classes.graph.Graph]) -> None:
"""
Fitting a graph level FEATHER model.

Arg types:
* **graphs** *(List of NetworkX graphs)* - The graphs to be embedded.
"""
self._set_seed()
graphs = self._check_graphs(graphs)
self._embedding = [self._calculate_feather(graph) for graph in graphs]

[docs]    def get_embedding(self) -> np.array:
r"""Getting the embedding of graphs.

Return types:
* **embedding** *(Numpy array)* - The embedding of graphs.
"""
return np.array(self._embedding)
```