# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2015-2023 Tanguy Fardet
# SPDX-License-Identifier: GPL-3.0-or-later
# nngt/analysis/nx_functions.py
""" Tools to analyze graphs with the networkx backend """
import numpy as np
import scipy.sparse as ssp
from ..lib.test_functions import nonstring_container, is_integer
from ..lib.graph_helpers import _get_nx_weights, _get_nx_graph
import networkx as nx
[docs]def global_clustering_binary_undirected(g):
'''
Returns the undirected global clustering coefficient.
This corresponds to the ratio of undirected triangles to the number of
undirected triads.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
References
----------
.. [nx-global-clustering] :nxdoc:`algorithms.cluster.transitivity`
'''
return nx.transitivity(g.graph.to_undirected(as_view=True))
[docs]def local_clustering_binary_undirected(g, nodes=None):
'''
Returns the undirected local clustering coefficient of some `nodes`.
If `g` is directed, then it is converted to a simple undirected graph
(no parallel edges).
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
nodes : list, optional (default: all nodes)
The list of nodes for which the clustering will be returned
Returns
-------
lc : :class:`numpy.ndarray`
The list of clustering coefficients, on per node.
References
----------
.. [nx-local-clustering] :nxdoc:`algorithms.cluster.clustering`
'''
num_nodes = g.node_nb()
if nonstring_container(nodes):
num_nodes = len(nodes)
elif nodes is not None:
num_nodes = 1
lc = nx.clustering(g.graph.to_undirected(as_view=True), nodes=nodes,
weight=None)
if num_nodes == 1:
return lc
if nodes is None:
nodes = list(range(num_nodes))
return np.array([lc[n] for n in nodes], dtype=float)
[docs]def assortativity(g, degree, weights=None):
'''
Returns the assortativity of the graph.
This tells whether nodes are preferentially connected together depending
on their degree.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
degree : str
The type of degree that should be considered.
weights : bool or str, optional (default: binary edges)
Whether edge weights should be considered; if ``None`` or ``False``
then use binary edges; if ``True``, uses the 'weight' edge attribute,
otherwise uses any valid edge attribute required.
References
----------
.. [nx-assortativity]
:nxdoc:`algorithms.assortativity.degree_assortativity_coefficient`
'''
w = _get_nx_weights(g, weights)
return nx.degree_pearson_correlation_coefficient(
g.graph, x=degree, y=degree, weight=w)
[docs]def reciprocity(g):
'''
Calculate the edge reciprocity of the graph.
The reciprocity is defined as the number of edges that have a reciprocal
edge (an edge between the same nodes but in the opposite direction)
divided by the total number of edges.
This is also the probability for any given edge, that its reciprocal edge
exists.
By definition, the reciprocity of undirected graphs is 1.
@todo: check whether we can get this for single nodes for all libraries.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
References
----------
.. [nx-reciprocity] :nxdoc:`algorithms.reciprocity.overall_reciprocity`
'''
if not g.is_directed():
return 1.
return nx.overall_reciprocity(g.graph)
[docs]def closeness(g, weights=None, nodes=None, mode="out", harmonic=True,
default=np.NaN):
r'''
Returns the closeness centrality of some `nodes`.
Closeness centrality of a node `u` is defined, for the harmonic version,
as the sum of the reciprocal of the shortest path distance :math:`d_{uv}`
from `u` to the N - 1 other nodes in the graph (if `mode` is "out",
reciprocally :math:`d_{vu}`, the distance to `u` from another node v,
if `mode` is "in"):
.. math::
C(u) = \frac{1}{N - 1} \sum_{v \neq u} \frac{1}{d_{uv}},
or, using the arithmetic definition, as the reciprocal of the
average shortest path distance to/from `u` over to all other nodes:
.. math::
C(u) = \frac{n - 1}{\sum_{v \neq u} d_{uv}},
where `d_{uv}` is the shortest-path distance from `u` to `v`,
and `n` is the number of nodes in the component.
By definition, the distance is infinite when nodes are not connected by
a path in the harmonic case (such that :math:`\frac{1}{d(v, u)} = 0`),
while the distance itself is taken as zero for unconnected nodes in the
first equation.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
weights : bool or str, optional (default: binary edges)
Whether edge weights should be considered; if ``None`` or ``False``
then use binary edges; if ``True``, uses the 'weight' edge attribute,
otherwise uses any valid edge attribute required.
nodes : list, optional (default: all nodes)
The list of nodes for which the clutering will be returned
mode : str, optional (default: "out")
For directed graphs, whether the distances are computed from ("out") or
to ("in") each of the nodes.
harmonic : bool, optional (default: True)
Whether the arithmetic or the harmonic (recommended) version
of the closeness should be used.
Returns
-------
c : :class:`numpy.ndarray`
The list of closeness centralities, on per node.
References
----------
.. [nx-harmonic] :nxdoc:`algorithms.centrality.harmonic_centrality`
.. [nx-closeness] :nxdoc:`algorithms.centrality.closeness_centrality`
'''
graph = g.graph
if graph.is_directed() and mode == "out":
graph = g.graph.reverse(copy=False)
w = _get_nx_weights(g, weights, reverse=True)
c = None
if harmonic:
c = nx.harmonic_centrality(graph, distance=w)
else:
c = nx.closeness_centrality(graph, distance=w, wf_improved=False)
c = np.array([v for _, v in c.items()])
# normalize
if harmonic:
c *= 1 / (len(graph) - 1)
elif default != 0:
c[c == 0.] = default
if nodes is None:
return c
return c[nodes]
[docs]def betweenness(g, btype="both", weights=None):
'''
Returns the normalized betweenness centrality of the nodes and edges.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
btype : str, optional (default 'both')
The centrality that should be returned (either 'node', 'edge', or
'both'). By default, both betweenness centralities are computed.
weights : bool or str, optional (default: binary edges)
Whether edge weights should be considered; if ``None`` or ``False``
then use binary edges; if ``True``, uses the 'weight' edge attribute,
otherwise uses any valid edge attribute required.
Returns
-------
nb : :class:`numpy.ndarray`
The nodes' betweenness if `btype` is 'node' or 'both'
eb : :class:`numpy.ndarray`
The edges' betweenness if `btype` is 'edge' or 'both'
References
----------
.. [nx-ebetw] :nxdoc:`algorithms.centrality.edge_betweenness_centrality`
.. [nx-nbetw] :nxdoc:`networkx.algorithms.centrality.betweenness_centrality`
'''
w = _get_nx_weights(g, weights)
nb, eb = None, None
if btype in ("both", "node"):
di_nb = nx.betweenness_centrality(g.graph, weight=w)
nb = np.array([di_nb[i] for i in g.get_nodes()])
if btype in ("both", "edge"):
di_eb = nx.edge_betweenness_centrality(g.graph, weight=w)
eb = np.array([di_eb[tuple(e)] for e in g.edges_array])
if btype == "node":
return nb
elif btype == "edge":
return eb
return nb, eb
[docs]def connected_components(g, ctype=None):
'''
Returns the connected component to which each node belongs.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
ctype : str, optional (default 'scc')
Type of component that will be searched: either strongly connected
('scc', by default) or weakly connected ('wcc').
Returns
-------
cc, hist : :class:`numpy.ndarray`
The component associated to each node (`cc`) and the number of nodes in
each of the component (`hist`).
References
----------
.. [nx-ucc] :nxdoc:`algorithms.components.connected_components`
.. [nx-scc] :nxdoc:`algorithms.components.strongly_connected_components`
.. [nx-wcc] :nxdoc:`algorithms.components.weakly_connected_components`
'''
if ctype is None:
ctype = "scc" if g.is_directed() else "wcc"
res = None
if not g.is_directed():
res = nx.connected_components(g.graph)
elif ctype == "scc":
res = nx.strongly_connected_components(g.graph)
elif ctype == "wcc":
res = nx.weakly_connected_components(g.graph)
else:
raise ValueError("Invalid `ctype`, only 'scc' and 'wcc' are allowed.")
cc = np.zeros(g.node_nb(), dtype=int)
hist = []
for i, nodes in enumerate(res):
cc[list(nodes)] = i
hist.append(len(nodes))
return cc, np.array(hist, dtype=int)
[docs]def shortest_path(g, source, target, directed=None, weights=None,
combine_weights="mean"):
'''
Returns a shortest path between `source`and `target`.
The algorithms returns an empty list if there is no path between the nodes.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
source : int
Node from which the path starts.
target : int
Node where the path ends.
directed : bool, optional (default: ``g.is_directed()``)
Whether the edges should be considered as directed or not
(automatically set to False if `g` is undirected).
weights : str or array, optional (default: binary)
Whether to use weighted edges to compute the distances. By default,
all edges are considered to have distance 1.
combine_weights : str, optional (default: 'mean')
How to combine the weights of reciprocal edges if the graph is directed
but `directed` is set to False. It can be:
* "sum": the sum of the edge attribute values will be used for the new
edge.
* "mean": the mean of the edge attribute values will be used for the
new edge.
* "min": the minimum of the edge attribute values will be used for the
new edge.
* "max": the maximum of the edge attribute values will be used for the
new edge.
Returns
-------
path : list of ints
Order of the nodes making up the path from `source` to `target`.
References
----------
.. [nx-sp] :nxdoc:`algorithms.shortest_paths.generic.shortest_path`
'''
g, graph, w = _get_nx_graph(g, directed, weights, combine_weights,
return_all=True)
w = _get_nx_weights(g, w)
try:
return nx.shortest_path(graph, source, target, weight=w)
except nx.NetworkXNoPath:
return []
[docs]def all_shortest_paths(g, source, target, directed=None, weights=None,
combine_weights="mean"):
'''
Yields all shortest paths from `source` to `target`.
The algorithms returns an empty generator if there is no path between the
nodes.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
source : int
Node from which the paths starts.
target : int, optional (default: all nodes)
Node where the paths ends.
directed : bool, optional (default: ``g.is_directed()``)
Whether the edges should be considered as directed or not
(automatically set to False if `g` is undirected).
weights : str or array, optional (default: binary)
Whether to use weighted edges to compute the distances. By default,
all edges are considered to have distance 1.
combine_weights : str, optional (default: 'mean')
How to combine the weights of reciprocal edges if the graph is directed
but `directed` is set to False. It can be:
* "sum": the sum of the edge attribute values will be used for the new
edge.
* "mean": the mean of the edge attribute values will be used for the
new edge.
* "min": the minimum of the edge attribute values will be used for the
new edge.
* "max": the maximum of the edge attribute values will be used for the
new edge.
Returns
-------
all_paths : generator
Generator yielding paths as lists of ints.
References
----------
.. [nx-sp] :nxdoc:`algorithms.shortest_paths.generic.all_shortest_paths`
'''
g, graph, w = _get_nx_graph(g, directed, weights, combine_weights,
return_all=True)
w = _get_nx_weights(g, w)
try:
return nx.all_shortest_paths(graph, source, target, weight=w)
except nx.NetworkXNoPath:
return (_ for _ in [])
[docs]def shortest_distance(g, sources=None, targets=None, directed=None,
weights=None, combine_weights="mean"):
'''
Returns the length of the shortest paths between `sources`and `targets`.
The algorithms return infinity if there are no paths between nodes.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
sources : list of nodes, optional (default: all)
Nodes from which the paths must be computed.
targets : list of nodes, optional (default: all)
Nodes to which the paths must be computed.
directed : bool, optional (default: ``g.is_directed()``)
Whether the edges should be considered as directed or not
(automatically set to False if `g` is undirected).
weights : str or array, optional (default: binary)
Whether to use weighted edges to compute the distances. By default,
all edges are considered to have distance 1.
combine_weights : str, optional (default: 'mean')
How to combine the weights of reciprocal edges if the graph is directed
but `directed` is set to False. It can be:
* "sum": the sum of the edge attribute values will be used for the new
edge.
* "mean": the mean of the edge attribute values will be used for the
new edge.
* "min": the minimum of the edge attribute values will be used for the
new edge.
* "max": the maximum of the edge attribute values will be used for the
new edge.
Returns
-------
distance : float, or 1d/2d numpy array of floats
Distance (if single source and single target) or distance array.
For multiple sources and targets, the shape of the matrix is (S, T),
with S the number of sources and T the number of targets; for a single
source or target, return a 1d-array of length T or S.
References
----------
.. [nx-sp] :nxdoc:`algorithms.shortest_paths.weighted.multi_source_dijkstra`
'''
num_nodes = g.node_nb()
# check consistency for weights and directed
g, graph, w = _get_nx_graph(g, directed, weights, combine_weights,
return_all=True)
w = _get_nx_weights(g, w)
# check for single source/target case and convert sources and targets
if is_integer(sources):
if is_integer(targets):
try:
return nx.shortest_path_length(graph, sources, targets,
weight=w)
except Exception as e:
return np.inf
sources = [sources]
elif sources is None:
sources = range(num_nodes)
if is_integer(targets):
targets = [targets]
# compute distances
data, ii, jj = [], [], []
def _nx_sp(nx_graph, s, weight):
if weight is None:
return nx.single_source_shortest_path_length(nx_graph, s)
dist, _ = nx.multi_source_dijkstra(graph, [s], weight=weight)
return dist
for s in sources:
dist = _nx_sp(graph, s, w)
if targets is None:
data.extend(dist.values())
ii.extend((s for _ in range(len(dist))))
jj.extend(dist.keys())
else:
for t in targets:
if t in dist:
data.append(dist[t])
ii.append(s)
jj.append(t)
num_sources = num_nodes if sources is None else len(sources)
num_targets = num_nodes if targets is None else len(targets)
mat_dist = np.full((num_sources, num_targets), np.inf)
mat_dist[ii, jj] = data
if num_sources == 1:
return mat_dist[0]
if num_targets == 1:
return mat_dist.T[0]
return mat_dist
[docs]def average_path_length(g, sources=None, targets=None, directed=None,
weights=None, combine_weights="mean",
unconnected=False):
r'''
Returns the average shortest path length between `sources` and `targets`.
The algorithms raises an error if all nodes are not connected unless
`unconnected` is set to True.
The average path length is defined as
.. math::
L = \frac{1}{N_p} \sum_{u,v} d(u, v),
where :math:`N_p` is the number of paths between `sources` and `targets`,
and :math:`d(u, v)` is the shortest path distance from u to v.
If `sources` and `targets` are both None, then the total number of paths is
:math:`N_p = N(N - 1)`, with :math:`N` the number of nodes in the graph.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
sources : list of nodes, optional (default: all)
Nodes from which the paths must be computed.
targets : list of nodes, optional (default: all)
Nodes to which the paths must be computed.
directed : bool, optional (default: ``g.is_directed()``)
Whether the edges should be considered as directed or not
(automatically set to False if `g` is undirected).
weights : str or array, optional (default: binary)
Whether to use weighted edges to compute the distances. By default,
all edges are considered to have distance 1.
combine_weights : str, optional (default: 'mean')
How to combine the weights of reciprocal edges if the graph is directed
but `directed` is set to False. It can be:
* "sum": the sum of the edge attribute values will be used for the new
edge.
* "mean": the mean of the edge attribute values will be used for the
new edge.
* "min": the minimum of the edge attribute values will be used for the
new edge.
* "max": the maximum of the edge attribute values will be used for the
new edge.
unconnected : bool, optional (default: False)
If set to true, ignores unconnected nodes and returns the average path
length of the existing paths.
References
----------
.. [nx-sp] :nxdoc:`algorithms.shortest_paths.generic.average_shortest_path_length`
'''
directed = g.is_directed() if directed is None else directed
if sources is None and targets is None and not unconnected:
g, graph, w = _get_nx_graph(g, directed, weights, combine_weights,
return_all=True)
w = _get_nx_weights(g, w)
return nx.average_shortest_path_length(graph, weight=w)
mat_dist = shortest_distance(g, sources=sources, targets=targets,
directed=directed, weights=weights)
if not unconnected and np.any(np.isinf(mat_dist)):
raise nx.NetworkXNoPath("`sources` and `target` do not belong to the "
"same connected component.")
# compute the number of path
num_paths = np.sum(mat_dist != 0)
# compute average path length
if unconnected:
num_paths -= np.sum(np.isinf(mat_dist))
return np.nansum(mat_dist) / num_paths
return np.sum(mat_dist) / num_paths
[docs]def diameter(g, directed=None, weights=None, combine_weights="mean",
is_connected=False):
'''
Returns the diameter of the graph.
.. versionchanged:: 2.3
Added `combine_weights` argument.
.. versionchanged:: 2.0
Added `directed` and `is_connected` arguments.
It returns infinity if the graph is not connected (strongly connected for
directed graphs) unless `is_connected` is True, in which case it returns
the longest existing shortest distance.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
directed : bool, optional (default: ``g.is_directed()``)
Whether to compute the directed diameter if the graph is directed.
If False, then the graph is treated as undirected. The option switches
to False automatically if `g` is undirected.
weights : bool or str, optional (default: binary edges)
Whether edge weights should be considered; if ``None`` or ``False``
then use binary edges; if ``True``, uses the 'weight' edge attribute,
otherwise uses any valid edge attribute required.
combine_weights : str, optional (default: 'mean')
How to combine the weights of reciprocal edges if the graph is directed
but `directed` is set to False. It can be:
* "sum": the sum of the edge attribute values will be used for the new
edge.
* "mean": the mean of the edge attribute values will be used for the
new edge.
* "min": the minimum of the edge attribute values will be used for the
new edge.
* "max": the maximum of the edge attribute values will be used for the
new edge.
is_connected : bool, optional (default: False)
If False, check whether the graph is connected or not and return
infinite diameter if graph is unconnected. If True, the graph is
assumed to be connected.
See also
--------
:func:`nngt.analysis.shortest_distance`
References
----------
.. [nx-diameter] :nxdoc:`algorithms.distance_measures.diameter`
.. [nx-dijkstra] :nxdoc:`algorithms.shortest_paths.weighted.all_pairs_dijkstra`
'''
w = _get_nx_weights(g, weights)
# weighted or "connected" cases
if w is not None or is_connected:
dist = shortest_distance(g, directed=directed, weights=weights,
combine_weights=combine_weights)
if is_connected:
return np.max(dist[~np.isinf(dist)])
return np.max(dist)
# unweighted case
graph = _get_nx_graph(g, directed, w, combine_weights)
try:
return nx.diameter(graph)
except nx.exception.NetworkXError:
return np.inf
def adj_mat(g, weights=None, mformat="csr"):
r'''
Returns the adjacency matrix :math:`A` of the graph.
With edge :math:`i \leftarrow j` corresponding to entry :math:`A_{ij}`.
Parameters
----------
g : :class:`~nngt.Graph`
Graph to analyze.
weights : bool or str, optional (default: binary edges)
Whether edge weights should be considered; if ``None`` or ``False``
then returns the binary adjacency matrix; if ``True``, returns the
weighted matrix, otherwise fills the matrix with any valid edge
attribute values.
mformat : str, optional (default: "csr")
Type of :mod:`scipy.sparse` matrix that will be returned, by
default :class:`scipy.sparse.csr_matrix`.
Returns
-------
The adjacency matrix as a :class:`scipy.sparse.csr_matrix`.
References
----------
.. [nx-adjacency] :nxdoc:`.convert_matrix.to_scipy_sparse_array`
'''
w = _get_nx_weights(g, weights)
if nx.__version__ >= "2.7":
# convert back to sparse matrix
mat = nx.to_scipy_sparse_array(
g.graph, nodelist=range(g.node_nb()), weight=w, format=mformat)
if mformat == "dense":
return mat
cls = getattr(ssp, f'{mat.format}_matrix')
return cls(mat)
return nx.to_scipy_sparse_matrix(
g.graph, nodelist=range(g.node_nb()), weight=w, format=mformat)
def get_edges(g):
'''
Returns the edges in the graph by order of creation.
'''
return g.edges_array