r"""
Base Task Class for Graph Problems.
"""
# Copyright (c) 2024 Thinklab@SJTU
# ML4CO-Kit is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
import pickle
import pathlib
import numpy as np
import scipy.sparse
import networkx as nx
from typing import Union, Tuple
from chszlablib import Graph as CHSZLabLibGraph
from ml4co_kit.task.base import TaskBase, TASK_TYPE
from ml4co_kit.utils.file_utils import check_file_path
[docs]class GraphTaskBase(TaskBase):
r"""
Base class for all undirected graph problems in the ML4CO kit.
"""
def __init__(
self,
task_type: TASK_TYPE,
minimize: bool,
node_weighted: bool = False,
edge_weighted: bool = False,
precision: Union[np.float32, np.float64] = np.float32
):
# Super Initialization
super(GraphTaskBase, self).__init__(
task_type=task_type,
minimize=minimize,
precision=precision
)
# Initialize Attributes (basic)
self.node_weighted = node_weighted
self.edge_weighted = edge_weighted
self.nodes_num: int = None
self.edges_num: int = None
self.nodes_weight: np.ndarray = None
self.edges_weight: np.ndarray = None
self.edge_index: np.ndarray = None # [Method 1] Edge Index in shape (2, num_edges)
# Symmetric
self.already_symmetric = False
# Self-loop
self.self_loop = None
# Initialize Attributes (other structure)
self.adj_matrix: np.ndarray = None # [Method 2] Adjacency Matrix
self.adj_matrix_weighted: np.ndarray = None # [Method 3] Adjacency Matrix with edges_weight
self.xadj: np.ndarray = None # [Method 4] Compressed Sparse Row (CSR) representation
self.adjncy: np.ndarray = None # [Method 4] Compressed Sparse Row (CSR) representation
def _check_nodes_weight_dim(self):
"""Ensure node weights is a 1D array."""
if self.nodes_weight.ndim != 1:
raise ValueError("Node weights should be a 1D array with shape (num_nodes,).")
def _check_edges_weight_dim(self):
"""Ensure edge weights is a 1D array."""
if self.edges_weight.ndim != 1:
raise ValueError("Edge weights should be a 1D array with shape (num_edges,).")
def _check_edges_index_dim(self):
"""Ensure edge index is a 2D array with shape (2, num_edges)."""
if self.edge_index.ndim != 2 or self.edge_index.shape[0] != 2:
raise ValueError("Edge index should be a 2D array with shape (2, num_edges).")
if self.edges_num is not None and self.edge_index.shape[1] != self.edges_num:
raise ValueError("Edge index second dimension should match number of edges.")
def _check_edges_index_not_none(self):
"""Ensure edge index is not None."""
if self.edge_index is None:
raise ValueError("Edge index cannot be None!")
def _check_sol_dim(self):
"""Ensure solution is a 1D array."""
if self.sol.ndim != 1:
raise ValueError("Solution should be a 1D array.")
def _check_ref_sol_dim(self):
"""Ensure reference solution is a 1D array."""
if self.ref_sol.ndim != 1:
raise ValueError("Reference solution should be a 1D array.")
def _invalidate_cached_structures(self):
"""Invalidate cached structures."""
self.adj_matrix = None
self.adj_matrix_weighted = None
self.xadj = None
self.adjncy = None
def _deal_with_self_loop(self):
raise NotImplementedError("Subclasses should implement this method.")
[docs] def from_data(
self,
edge_index: np.ndarray = None,
nodes_weight: np.ndarray = None,
edges_weight: np.ndarray = None,
sol: np.ndarray = None,
ref: bool = False,
re_check_symmetric: bool = False,
):
# Set Attributes and Check Dimensions
if nodes_weight is not None:
if self.node_weighted == False:
raise ValueError(
"The graph is not defined as node-weighted, but node weights are provided."
)
self.node_weighted = True
self.nodes_weight = nodes_weight.astype(self.precision)
self._check_nodes_weight_dim()
self.nodes_num = int(nodes_weight.shape[0])
if edges_weight is not None:
if self.edge_weighted == False:
raise ValueError(
"The graph is not defined as edge-weighted, but edge weights are provided."
)
self.edge_weighted = True
self.edges_weight = edges_weight.astype(self.precision)
self._check_edges_weight_dim()
self.edges_num = int(edges_weight.shape[0])
if edge_index is not None:
self.edge_index = edge_index
self._check_edges_index_dim()
# Infer nodes_num and edges_num if not provided
if self.nodes_num is None:
self.nodes_num = int(np.max(edge_index) + 1)
if self.edges_num is None:
self.edges_num = int(edge_index.shape[1])
if sol is not None:
if ref:
self.ref_sol = sol
self._check_ref_sol_dim()
else:
self.sol = sol
self._check_sol_dim()
# Default Initialization if not provided
if self.nodes_weight is None and self.nodes_num is not None:
self.nodes_weight = np.ones(self.nodes_num, dtype=self.precision)
if self.edges_weight is None and self.edges_num is not None:
self.edges_weight = np.ones(self.edges_num, dtype=self.precision)
# Make the graph symmetric
if self.already_symmetric == False or re_check_symmetric:
self.make_symmetric()
# Deal with self-loop
if self.self_loop is None:
self._deal_with_self_loop()
[docs] def from_adj_matrix(
self,
adj_matrix: np.ndarray,
nodes_weight: np.ndarray = None,
edges_weight: np.ndarray = None,
):
"""Load graph data from an adjacency matrix."""
# Check if the adjacency matrix is square
if adj_matrix.ndim != 2:
raise ValueError("Adjacency matrix should be a 2D array.")
# Check if the adjacency matrix is all-ones
if not np.all(np.isin(adj_matrix, [0, 1])):
raise ValueError("Adjacency matrix should contain only 0s and 1s.")
# Convert adjacency matrix to edge_index and edges_weight
coo = scipy.sparse.coo_matrix(adj_matrix)
edge_index = np.vstack((coo.row, coo.col)).astype(np.int32)
# Use ``from_data``
self.from_data(
edge_index=edge_index,
nodes_weight=nodes_weight,
edges_weight=edges_weight
)
[docs] def from_adj_matrix_weighted(
self,
adj_matrix_weighted: np.ndarray,
nodes_weight: np.ndarray = None,
):
"""Load graph data from an adjacency matrix."""
# Check if the adjacency matrix is square
if adj_matrix_weighted.ndim != 2:
raise ValueError("Adjacency matrix should be a 2D array.")
# Convert adjacency matrix to edge_index and edges_weight
coo = scipy.sparse.coo_matrix(adj_matrix_weighted)
edge_index = np.vstack((coo.row, coo.col)).astype(np.int32)
if self.edge_weighted != False:
edges_weight = edges_weight=coo.data.astype(self.precision)
else:
edges_weight = None
# Use ``from_data``
self.from_data(
edge_index=edge_index,
nodes_weight=nodes_weight,
edges_weight=edges_weight
)
[docs] def to_adj_matrix(self, with_edge_weights: bool = False) -> np.ndarray:
"""Convert edge_index and edges_weight to adjacency matrix."""
if with_edge_weights:
if self.adj_matrix_weighted is None:
self.adj_matrix_weighted = scipy.sparse.coo_matrix(
arg1=(
self.edges_weight,
(self.edge_index[0], self.edge_index[1])
),
shape=(self.nodes_num, self.nodes_num)
).toarray().astype(self.precision)
return self.adj_matrix_weighted
else:
if self.adj_matrix is None:
self.adj_matrix = scipy.sparse.coo_matrix(
arg1=(
np.ones_like(self.edges_weight),
(self.edge_index[0], self.edge_index[1])
),
shape=(self.nodes_num, self.nodes_num)
).toarray().astype(self.precision)
return self.adj_matrix
[docs] def from_gpickle_result(
self,
gpickle_file_path: pathlib.Path = None,
result_file_path: pathlib.Path = None,
ref: bool = False,
):
"""Load graph data from a gpickle file."""
# Read graph data from .gpickle
if gpickle_file_path is not None:
with open(gpickle_file_path, "rb") as f:
nx_graph: nx.Graph = pickle.load(f)
# Use ``from_nx_graph``
self.from_networkx(nx_graph)
if result_file_path is not None:
with open(result_file_path, "r") as f:
sol = [int(_) for _ in f.read().splitlines()]
# Use ``from_data``
self.from_data(sol=np.array(sol, dtype=np.int32), ref=ref)
[docs] def to_gpickle_result(
self,
gpickle_file_path: pathlib.Path = None,
result_file_path: pathlib.Path = None,
):
"""Save graph data to a ``.gpickle`` or ``.result`` file."""
# Save graph data to a .gpickle file
if gpickle_file_path is not None:
# Check file path
check_file_path(gpickle_file_path)
# Transfer to NetworkX graph
nx_graph = self.to_networkx()
# Save to .gpickle file
with open(gpickle_file_path, "wb") as f:
pickle.dump(nx_graph, f, pickle.HIGHEST_PROTOCOL)
# Save graph data to a .result file
if result_file_path is not None:
# Check file path
check_file_path(result_file_path)
# Save to .result file
with open(result_file_path, "w") as f:
for node_label in self.sol:
f.write(f"{node_label}\n")
[docs] def from_networkx(self, nx_graph: nx.Graph):
"""Load graph data from a NetworkX graph object."""
# Extract nodes and edges information
self.nodes_num = int(nx_graph.number_of_nodes())
self.edges_num = int(nx_graph.number_of_edges())
# Extract node weights if available
nodes_weight = None
if self.node_weighted != False and \
all("weight" in nx_graph.nodes[n] for n in nx_graph.nodes):
nodes_weight = np.array(
[nx_graph.nodes[n]["weight"] for n in nx_graph.nodes],
dtype=self.precision
)
self.node_weighted = True
else:
nodes_weight = None
# Extract edge weights if available
edges_weight = None
if self.edge_weighted != False and \
all("weight" in nx_graph.edges[e] for e in nx_graph.edges):
edges_weight = np.array(
[nx_graph.edges[e]["weight"] for e in nx_graph.edges],
dtype=self.precision
)
self.edge_weighted = True
else:
edges_weight = None
# Extract edge index
edges = list(nx_graph.edges)
edge_index = np.array(edges, dtype=np.int32).T
# Use ``from_data``
self.from_data(
edge_index=edge_index,
nodes_weight=nodes_weight,
edges_weight=edges_weight
)
[docs] def to_networkx(self) -> nx.Graph:
"""Convert current graph data to a NetworkX graph object."""
nx_graph = nx.Graph()
# Add nodes with weights if available
if self.nodes_weight is not None:
for i in range(self.nodes_num):
nx_graph.add_node(i, weight=self.nodes_weight[i])
else:
nx_graph.add_nodes_from(range(self.nodes_num))
# Add edges with weights if available
if self.edges_weight is not None:
for i in range(self.edges_num):
u = self.edge_index[0, i]
v = self.edge_index[1, i]
nx_graph.add_edge(u, v, weight=self.edges_weight[i])
else:
edges = list(zip(self.edge_index[0], self.edge_index[1]))
nx_graph.add_edges_from(edges)
return nx_graph
[docs] def from_csr(
self,
xadj: np.ndarray,
adjncy: np.ndarray,
nodes_weight: np.ndarray = None,
edges_weight: np.ndarray = None
):
"""Load graph data from a CSR representation."""
# Store CSR representation
self.xadj = xadj
self.adjncy = adjncy
# Get edge_index from csr representation
edge_index = [
[src, dst]
for src in range(len(xadj) - 1)
for dst in adjncy[xadj[src]:xadj[src + 1]]
]
edge_index = np.array(edge_index).T
# Use ``from_data``
self.from_data(
edge_index=edge_index,
nodes_weight=nodes_weight,
edges_weight=edges_weight
)
[docs] def to_csr(self) -> Tuple[np.ndarray, np.ndarray]:
"""Convert edge_index to CSR representation (xadj, adjncy)."""
if self.xadj is None or self.adjncy is None:
csr = scipy.sparse.csr_matrix(
(np.ones(self.edges_num), (self.edge_index[0], self.edge_index[1])),
shape=(self.nodes_num, self.nodes_num)
)
self.xadj = csr.indptr.astype(np.int32)
self.adjncy = csr.indices.astype(np.int32)
return self.xadj, self.adjncy
[docs] def to_chszlablib(self, scale: float = 1e5) -> CHSZLabLibGraph:
"""Convert current graph data to a chszlablib graph object."""
# Convert to CSR representation
xadj, adjncy = self.to_csr()
node_weights = self.nodes_weight * scale if self.node_weighted else None
edge_weights = self.edges_weight * scale if self.edge_weighted else None
# Create chszlablib graph
chszlablib_graph = CHSZLabLibGraph.from_csr(
xadj=xadj,
adjncy=adjncy,
node_weights=node_weights,
edge_weights=edge_weights
)
# Return chszlablib graph
return chszlablib_graph
[docs] def make_complement(self):
"""Convert the graph to its complement."""
# Ensure the graph is not edge-weighted
if self.edge_weighted:
raise NotImplementedError("Complement of edge-weighted graphs is not supported.")
# Get current adjacency matrix
adj_matrix = self.to_adj_matrix()
# Create complement adjacency matrix
comp_adj = np.logical_not(adj_matrix).astype(int)
# Invalidate cached structures
self.edges_num = None
self.edges_weight = None
self._invalidate_cached_structures()
# Use ``from_adj_matrix`` to update graph
self.from_adj_matrix(adj_matrix=comp_adj)
[docs] def make_symmetric(self):
"""Convert the graph to its symmetric."""
# Step 1: Check if already symmetric
adj_matrix_weighted = self.to_adj_matrix(with_edge_weights=True)
# Step 2: Check for conflicting edges (both (i,j) and (j,i) exist)
# Check if there are asymmetric edges where both directions exist
asymmetric_mask = (adj_matrix_weighted != 0) & (adj_matrix_weighted.T != 0) \
& (adj_matrix_weighted != adj_matrix_weighted.T)
if np.any(asymmetric_mask):
raise ValueError(
"Cannot symmetrize graph: both (i,j) and (j,i) edges "
"exist with different weights for some i!=j"
)
# Step 3: Perform symmetrization (both structural and weight)
# Create symmetric adjacency matrix by taking maximum of original and transpose
symmetric_adj = np.maximum(adj_matrix_weighted, adj_matrix_weighted.T)
# Convert symmetric adjacency matrix back to edge_index and edges_weight
coo = scipy.sparse.coo_matrix(symmetric_adj)
edge_index = np.vstack((coo.row, coo.col)).astype(np.int32)
if self.edge_weighted:
edges_weight = coo.data.astype(self.precision)
else:
edges_weight = None
# Invalidate cached structures
self.edges_num = None
self.edges_weight = None
self._invalidate_cached_structures()
# Using ``from_data``
self.already_symmetric = True
self.from_data(edge_index=edge_index, edges_weight=edges_weight)
[docs] def add_self_loop(self):
"""Add self-loops to the graph."""
# Remove existing self-loops
self.remove_self_loop()
# Create self-loop edges
self_loops = np.arange(self.nodes_num, dtype=np.int32)
self_loop_edges = np.vstack((self_loops, self_loops))
self_loop_weights = np.ones(self.nodes_num, dtype=self.precision)
# Combine with existing edges
self.edge_index = np.hstack((self.edge_index, self_loop_edges))
self.edges_weight = np.hstack((self.edges_weight, self_loop_weights))
self.edges_num = int(self.edge_index.shape[1])
# Invalidate cached structures
self._invalidate_cached_structures()
[docs] def remove_self_loop(self):
"""Remove self-loops from the graph."""
# Identify non-self-loop edges
mask = self.edge_index[0] != self.edge_index[1]
self.edge_index = self.edge_index[:, mask]
self.edges_weight = self.edges_weight[mask]
self.edges_num = int(self.edge_index.shape[1])
# Invalidate cached structures
self._invalidate_cached_structures()
# NetworkX Layout
SUPPORT_POS_TYPE_DICT = {
"bipartite_layout": nx.bipartite_layout,
"circular_layout": nx.circular_layout,
"kamada_kawai_layout": nx.kamada_kawai_layout,
"random_layout": nx.random_layout,
"rescale_layout": nx.rescale_layout,
"rescale_layout_dict": nx.rescale_layout_dict,
"shell_layout": nx.shell_layout,
"spring_layout": nx.spring_layout,
"spectral_layout": nx.spectral_layout,
"planar_layout": nx.planar_layout,
"fruchterman_reingold_layout": nx.fruchterman_reingold_layout,
"spiral_layout": nx.spiral_layout,
"multipartite_layout": nx.multipartite_layout,
}
# Supported Pos Types
SUPPORT_POS_TYPE = [
"bipartite_layout",
"circular_layout",
"kamada_kawai_layout",
"random_layout",
"rescale_layout",
"rescale_layout_dict",
"shell_layout",
"spring_layout",
"spectral_layout",
"planar_layout",
"fruchterman_reingold_layout",
"spiral_layout",
"multipartite_layout",
]
# Get Position Layer
[docs]def get_pos_layer(pos_type: str):
if pos_type not in SUPPORT_POS_TYPE:
raise ValueError(f"unvalid pos type, only supports {SUPPORT_POS_TYPE}")
return SUPPORT_POS_TYPE_DICT[pos_type]