r"""
Base classes for all graph problem generators.
"""
# Copyright (c) 2024 Thinklab@SJTU
# ML4CO-Kit is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
import random
import itertools
import numpy as np
import networkx as nx
from enum import Enum
from typing import Union
from ml4co_kit.task.base import TASK_TYPE
from ml4co_kit.generator.base import GeneratorBase
from ml4co_kit.task.graph.base import GraphTaskBase
[docs]class GRAPH_TYPE(str, Enum):
"""Define the graph types as an enumeration."""
ER = "er" # Erdos-Renyi Graph
BA = "ba" # Barabasi-Albert Graph
HK = "hk" # Holme-Kim Graph
WS = "ws" # Watts-Strogatz Graph
RB = "rb" # RB Graph
[docs]class GRAPH_WEIGHT_TYPE(str, Enum):
"""Define the weight types as an enumeration."""
UNIFORM = "uniform" # Uniform Weight
GAUSSIAN = "gaussian" # Gaussian Weight
POISSON = "poisson" # Poisson Weight
EXPONENTIAL = "exponential" # Exponential Weight
LOGNORMAL = "lognormal" # Lognormal Weight
POWERLAW = "powerlaw" # Powerlaw Weight
BINORMIAL = "binomial" # Binomial Weight
[docs]class GraphWeightGenerator(object):
def __init__(
self,
weighted_type: GRAPH_WEIGHT_TYPE,
precision: Union[np.float32, np.float64] = np.float32,
# gaussian
gaussian_mean: float = 0.0,
gaussian_std: float = 1.0,
# poisson
poisson_lambda: float = 1.0,
# exponential
exponential_scale: float = 1.0,
# lognormal
lognormal_mean: float = 0.0,
lognormal_sigma: float = 1.0,
# powerlaw
powerlaw_a: float = 1.0,
powerlaw_b: float = 10.0,
powerlaw_sigma: float = 1.0,
# binomial
binomial_n: int = 10,
binomial_p: float = 0.5,
) -> None:
# Initialize Attributes
self.weighted_type = weighted_type
self.precision = precision
# Special Args for Gaussian
self.gaussian_mean = gaussian_mean
self.gaussian_std = gaussian_std
# Special Args for Poisson
self.poisson_lambda = poisson_lambda
# Special Args for Exponential
self.exponential_scale = exponential_scale
# Special Args for Lognormal
self.lognormal_mean = lognormal_mean
self.lognormal_sigma = lognormal_sigma
# Special Args for Powerlaw
self.powerlaw_a = powerlaw_a
self.powerlaw_b = powerlaw_b
self.powerlaw_sigma = powerlaw_sigma
# Special Args for Binomial
self.binomial_n = binomial_n
self.binomial_p = binomial_p
[docs] def gaussian_gen(self, size: int) -> np.ndarray:
return np.random.normal(
loc=self.gaussian_mean,
scale=self.gaussian_std,
size=(size,)
)
[docs] def poisson_gen(self, size: int) -> np.ndarray:
return np.random.poisson(
lam=self.poisson_lambda,
size=(size,)
)
[docs] def exponential_gen(self, size: int) -> np.ndarray:
return np.random.exponential(
scale=self.exponential_scale,
size=(size,)
)
[docs] def lognormal_gen(self, size: int) -> np.ndarray:
return np.random.lognormal(
mean=self.lognormal_mean,
sigma=self.lognormal_sigma,
size=(size,)
)
[docs] def powerlaw_gen(self, size: int) -> np.ndarray:
weights = (np.random.pareto(a=self.powerlaw_a, size=(size,)) + 1) * self.powerlaw_b
noise = np.random.normal(loc=0.0, scale=self.powerlaw_sigma, size=(size,))
weights += noise
return weights
[docs] def binormal_gen(self, size: int) -> np.ndarray:
return np.random.binomial(
n=self.binomial_n,
p=self.binomial_p,
size=(size,)
)
[docs] def generate(self, size: int) -> np.ndarray:
# Generate weights based on the specified type
if self.weighted_type == GRAPH_WEIGHT_TYPE.UNIFORM:
weights = self.uniform_gen(size)
elif self.weighted_type == GRAPH_WEIGHT_TYPE.GAUSSIAN:
weights = self.gaussian_gen(size)
elif self.weighted_type == GRAPH_WEIGHT_TYPE.POISSON:
weights = self.poisson_gen(size)
elif self.weighted_type == GRAPH_WEIGHT_TYPE.EXPONENTIAL:
weights = self.exponential_gen(size)
elif self.weighted_type == GRAPH_WEIGHT_TYPE.LOGNORMAL:
weights = self.lognormal_gen(size)
elif self.weighted_type == GRAPH_WEIGHT_TYPE.POWERLAW:
weights = self.powerlaw_gen(size)
elif self.weighted_type == GRAPH_WEIGHT_TYPE.BINORMIAL:
weights = self.binormal_gen(size)
else:
raise NotImplementedError(
f"The weight type {self.weighted_type} is not supported."
)
return weights.astype(self.precision)
[docs]class GraphGeneratorBase(GeneratorBase):
"""Base class for all graph problem generators."""
def __init__(
self,
task_type: TASK_TYPE,
distribution_type: GRAPH_TYPE = GRAPH_TYPE.ER,
precision: Union[np.float32, np.float64] = np.float32,
nodes_num_scale: tuple = (200, 300),
# special args for different distributions (structural)
er_prob: float = 0.15,
ba_conn_degree: int = 10,
hk_prob: float = 0.3,
hk_conn_degree: int = 10,
ws_prob: float = 0.3,
ws_ring_neighbors: int = 2,
rb_n_scale: tuple = (20, 25),
rb_k_scale: tuple = (5, 12),
rb_p_scale: tuple = (0.3, 1.0),
# special args for weighted graph (node/edge weights)
node_weighted: bool = False,
node_weighted_gen: GraphWeightGenerator = GraphWeightGenerator(
weighted_type=GRAPH_WEIGHT_TYPE.UNIFORM),
edge_weighted: bool = False,
edge_weighted_gen: GraphWeightGenerator = GraphWeightGenerator(
weighted_type=GRAPH_WEIGHT_TYPE.UNIFORM),
):
# Super Initialization
super(GraphGeneratorBase, self).__init__(
task_type=task_type,
distribution_type=distribution_type,
precision=precision
)
# Initialize Attributes
self.nodes_num_min, self.nodes_num_max = nodes_num_scale
# Special args for different distributions (structural)
self.er_prob = er_prob
self.ba_conn_degree = ba_conn_degree
self.hk_prob = hk_prob
self.hk_conn_degree = hk_conn_degree
self.ws_prob = ws_prob
self.ws_ring_neighbors = ws_ring_neighbors
self.rb_n_min, self.rb_n_max = rb_n_scale
self.rb_k_min, self.rb_k_max = rb_k_scale
self.rb_p_min, self.rb_p_max = rb_p_scale
# Special args for weighted graph (node/edge weights)
self.node_weighted = node_weighted
self.edge_weighted = edge_weighted
self.node_weighted_gen = node_weighted_gen
self.edge_weighted_gen = edge_weighted_gen
# Generation Function Dictionary
self.generate_func_dict = {
GRAPH_TYPE.BA: self._generate_barabasi_albert,
GRAPH_TYPE.ER: self._generate_erdos_renyi,
GRAPH_TYPE.HK: self._generate_holme_kim,
GRAPH_TYPE.RB: self._generate_rb,
GRAPH_TYPE.WS: self._generate_watts_strogatz,
}
def _generate_barabasi_albert(self) -> GraphTaskBase:
"""
@article{
barabasi1999emergence,
title={Emergence of scaling in random networks},
author={Barab{\'a}si, Albert-L{\'a}szl{\'o} and Albert, R{\'e}ka},
journal={science},
volume={286},
number={5439},
pages={509--512},
year={1999},
publisher={American Association for the Advancement of Science}
}
"""
# Generate Barabasi-Albert graph
nodes_num = np.random.randint(self.nodes_num_min, self.nodes_num_max+1)
nx_graph: nx.Graph = nx.barabasi_albert_graph(
n=nodes_num, m=min(self.ba_conn_degree, nodes_num)
)
# Add weights to nodes and edges if required
nx_graph = self._if_need_weighted(nx_graph, nodes_num)
# Create instance from nx.Graph
return self._create_instance(nx_graph)
def _generate_erdos_renyi(self) -> GraphTaskBase:
"""
@article{
erd6s1960evolution,
title={On the evolution of random graphs},
author={Erd6s, Paul and R{\'e}nyi, Alfr{\'e}d},
journal={Publ. Math. Inst. Hungar. Acad. Sci},
volume={5},
pages={17--61},
year={1960}
}
"""
# Generate Erdos-Renyi graph
nodes_num = np.random.randint(self.nodes_num_min, self.nodes_num_max+1)
nx_graph: nx.Graph = nx.erdos_renyi_graph(nodes_num, self.er_prob)
# Add weights to nodes and edges if required
nx_graph = self._if_need_weighted(nx_graph, nodes_num)
# Create instance from nx.Graph
return self._create_instance(nx_graph)
def _generate_holme_kim(self) -> GraphTaskBase:
"""
@article{
holme2002growing,
title={Growing scale-free networks with tunable clustering},
author={Holme, Petter and Kim, Beom Jun},
journal={Physical review E},
volume={65},
number={2},
pages={026107},
year={2002},
publisher={APS}
}
"""
# Generate Holme-Kim graph
nodes_num = np.random.randint(self.nodes_num_min, self.nodes_num_max+1)
nx_graph: nx.Graph = nx.powerlaw_cluster_graph(
n=nodes_num,
m=min(self.hk_conn_degree, nodes_num),
p=self.hk_prob
)
# Add weights to nodes and edges if required
nx_graph = self._if_need_weighted(nx_graph, nodes_num)
# Create instance from nx.Graph
return self._create_instance(nx_graph)
def _generate_watts_strogatz(self) -> GraphTaskBase:
"""
@article{
watts1998collective,
title={Collective dynamics of small-world networks},
author={Watts, Duncan J and Strogatz, Steven H},
journal={nature},
volume={393},
number={6684},
pages={440--442},
year={1998},
publisher={Nature Publishing Group}
}
"""
# Generate Watts-Strogatz graph
nodes_num = np.random.randint(self.nodes_num_min, self.nodes_num_max+1)
nx_graph: nx.Graph = nx.watts_strogatz_graph(
n=nodes_num, k=self.ws_ring_neighbors, p=self.ws_prob
)
# Add weights to nodes and edges if required
nx_graph = self._if_need_weighted(nx_graph, nodes_num)
# Create instance from nx.Graph
return self._create_instance(nx_graph)
def _generate_rb(self) -> GraphTaskBase:
"""
@article{
xu2005simple,
title={A simple model to generate hard satisfiable instances},
author={Xu, Ke and Boussemart, Fr{\'e}d{\'e}ric and Hemery, Fred and Lecoutre, Christophe},
journal={arXiv preprint cs/0509032},
year={2005}
}
"""
# Get params for RB model (n, k, a)
while True:
rb_n = np.random.randint(self.rb_n_min, self.rb_n_max)
rb_k = np.random.randint(self.rb_k_min, self.rb_k_max)
rb_v = rb_n * rb_k
if self.nodes_num_min <= rb_v and self.nodes_num_max >= rb_v:
break
rb_a = np.log(rb_k) / np.log(rb_n)
# Get params for RB model (p, r, s, iterations)
rb_p = np.random.uniform(self.rb_p_min, self.rb_p_max)
rb_r = - rb_a / np.log(1 - rb_p)
rb_s = int(rb_p * (rb_n ** (2 * rb_a)))
iterations = int(rb_r * rb_n * np.log(rb_n) - 1)
# Generate RB instance
parts = np.reshape(np.int64(range(rb_v)), (rb_n, rb_k))
nand_clauses = []
for i in parts:
nand_clauses += itertools.combinations(i, 2)
edges = set()
for _ in range(iterations):
i, j = np.random.choice(rb_n, 2, replace=False)
all = set(itertools.product(parts[i, :], parts[j, :]))
all -= edges
edges |= set(random.sample(tuple(all), k=min(rb_s, len(all))))
nand_clauses += list(edges)
clauses = {'NAND': nand_clauses}
# Convert to numpy array
clauses = {relation: np.int32(clause_list) for relation, clause_list in clauses.items()}
# Convert to nx.Graph
nx_graph = nx.Graph()
nx_graph.add_edges_from(clauses['NAND'])
# Add weights to nodes and edges if required
nx_graph = self._if_need_weighted(nx_graph, rb_v)
# Create instance from nx.Graph
return self._create_instance(nx_graph)
def _if_need_weighted(self, nx_graph: nx.Graph, nodes_num: int) -> nx.Graph:
"""Assign weights to nodes and/or edges if required."""
# Add weights to nodes if specified
if self.node_weighted:
node_weights = self.node_weighted_gen.generate(nodes_num)
for i, node in enumerate(nx_graph.nodes):
nx_graph.nodes[node]['weight'] = node_weights[i]
# Add weights to edges if specified
if self.edge_weighted:
edge_weights = self.edge_weighted_gen.generate(nx_graph.number_of_edges())
for i, edge in enumerate(nx_graph.edges):
nx_graph.edges[edge]['weight'] = edge_weights[i]
return nx_graph
def _create_instance(self, nx_graph: nx.Graph) -> GraphTaskBase:
"""Create instance from nx.Graph."""
raise NotImplementedError(
"Subclasses of GraphGeneratorBase must implement this method."
)