Source code for ml4co_kit.task.routing.tsp.spctsp

r"""
Stochastic Prize Collecting Traveling Salesman Problem (SPCTSP).

The SPCTSP is a variant of PCTSP where the actual prize values are revealed
only when nodes are visited, making it a stochastic optimization problem.
"""

# Copyright (c) 2024 Thinklab@SJTU
# ML4CO-Kit is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.


import pathlib
import numpy as np
from typing import Union
from ml4co_kit.task.base import TASK_TYPE
from ml4co_kit.task.routing.base import RoutingTaskBase, DISTANCE_TYPE, ROUND_TYPE


[docs]class SPCTSPTask(RoutingTaskBase): def __init__( self, distance_type: DISTANCE_TYPE = DISTANCE_TYPE.EUC_2D, round_type: ROUND_TYPE = ROUND_TYPE.NO, precision: Union[np.float32, np.float64] = np.float32, threshold: float = 1e-5 ): # Super Initialization super(SPCTSPTask, self).__init__( task_type=TASK_TYPE.SPCTSP, minimize=True, # SPCTSP is a minimization problem distance_type=distance_type, round_type=round_type, precision=precision ) # Initialize Attributes self.nodes_num = None # Number of nodes self.depots = None # Coordinates of depots self.points = None # Coordinates of points self.coords = None # All coordinates (including depots and points) self.expected_prizes = None # Expected prize values for each node self.norm_expected_prizes = None # Normalized expected prize values for each node self.actual_prizes = None # Actual revealed prize values (for simulation) self.norm_actual_prizes = None # Actual revealed prize values (for simulation) self.penalties = None # Penalty values for unvisited nodes self.required_prize = None # Minimum required prize to collect self.dists = None # Distance matrix self.threshold = threshold # Threshold for floating point precision def _normalize_depots_and_points_with_penalties(self): """ Normalize depots, points to [0, 1] range. Since the objective function includes the penalty, we need to normalize it together. """ if self.dist_eval.distance_type != DISTANCE_TYPE.EUC_2D: raise ValueError("Normalization is only supported for EUC_2D distance type.") depots = self.depots points = self.points penalties = self.penalties min_vals = min(np.min(points), np.min(self.depots)) max_vals = max(np.max(points), np.max(self.depots)) normalized_points = (points - min_vals) / (max_vals - min_vals) normalized_depots = (depots - min_vals) / (max_vals - min_vals) normalized_penalties = (penalties - min_vals) / (max_vals - min_vals) self.points = normalized_points self.depots = normalized_depots self.penalties = normalized_penalties def _check_depots_dim(self): """Check if depots are 1D or 2D.""" if self.depots.ndim != 1 or self.depots.shape[0] not in [2, 3]: raise ValueError( "Depots should be a 1D array with shape (2,) or (3,)." ) def _check_depots_not_none(self): """Check if depots are not None.""" if self.depots is None: raise ValueError("``depots`` cannot be None!") def _check_points_dim(self): """Check if points are 2D or 3D.""" if self.points.ndim != 2 or self.points.shape[1] not in [2, 3]: raise ValueError( "Points should be a 2D array with shape (num_points, 2) or (num_points, 3)." ) def _check_points_not_none(self): """Check if points are not None.""" if self.points is None: raise ValueError("``points`` cannot be None!") def _check_coords_not_none(self): """Check if coords are not None.""" if self.coords is None: raise ValueError( "``coords`` cannot be None! This attribute is generated " "automatically when ``depots`` and ``points`` are provided." ) def _check_expected_prizes_dim(self): """Ensure expected_prizes is a 1D array.""" if self.expected_prizes.ndim != 1: raise ValueError("Expected prizes should be a 1D array.") def _check_expected_prizes_not_none(self): """Check if expected_prizes are not None.""" if self.expected_prizes is None: raise ValueError("``expected_prizes`` cannot be None!") def _check_norm_expected_prizes_not_none(self): """Check if norm_expected_prizes are not None.""" if self.norm_expected_prizes is None: raise ValueError( "``norm_expected_prizes`` cannot be None! This attribute is generated " "automatically when ``expected_prizes`` and ``required_prize`` are provided." ) def _check_actual_prizes_dim(self): """Ensure actual_prizes is a 1D array.""" if self.actual_prizes.ndim != 1: raise ValueError("Actual prizes should be a 1D array.") def _check_actual_prizes_not_none(self): """Check if actual_prizes are not None.""" if self.actual_prizes is None: raise ValueError("``actual_prizes`` cannot be None!") def _check_norm_actual_prizes_not_none(self): """Check if norm_actual_prizes are not None.""" if self.norm_actual_prizes is None: raise ValueError( "``norm_actual_prizes`` cannot be None! This attribute is generated " "automatically when ``actual_prizes`` are provided." ) def _check_penalties_dim(self): """Ensure penalties is a 1D array.""" if self.penalties.ndim != 1: raise ValueError("Penalties should be a 1D array.") def _check_penalties_not_none(self): """Check if penalties are not None.""" if self.penalties is None: raise ValueError("``penalties`` cannot be None!") def _check_required_prize_not_none(self): """Check if required_prize is not None.""" if self.required_prize is None: raise ValueError("``required_prize`` cannot be None!") def _check_sol_dim(self): """Ensure solution is a 1D array.""" if self.sol.ndim != 1: raise ValueError("Solution should be a 1D array.") def _check_ref_sol_dim(self): """Ensure reference solution is a 1D array.""" if self.ref_sol.ndim != 1: raise ValueError("Reference solution should be a 1D array.") def _get_dists(self) -> np.ndarray: """Get distance matrix.""" if self.dists is None: dists = self.dist_eval.cal_dist_matrix(self.coords) self.dists = dists.astype(self.precision) return self.dists
[docs] def from_data( self, depots: np.ndarray = None, points: np.ndarray = None, penalties: np.ndarray = None, expected_prizes: np.ndarray = None, actual_prizes: np.ndarray = None, required_prize: float = None, sol: np.ndarray = None, ref: bool = False, normalize: bool = False, name: str = None ): # Set Attributes and Check Dimensions if depots is not None: self.depots = depots.astype(self.precision) self._check_depots_dim() if points is not None: self.points = points.astype(self.precision) self._check_points_dim() if penalties is not None: self.penalties = penalties.astype(self.precision) self._check_penalties_dim() if expected_prizes is not None: self.expected_prizes = expected_prizes.astype(self.precision) self._check_expected_prizes_dim() if actual_prizes is not None: self.actual_prizes = actual_prizes.astype(self.precision) self._check_actual_prizes_dim() if required_prize is not None: self.required_prize = required_prize # Merge depots and points if self.depots is not None and self.points is not None: self.coords = np.concatenate( [np.expand_dims(self.depots, axis=0), self.points], axis=0 ) # Normalize expected prizes according to required prize if self.expected_prizes is not None and self.required_prize is not None: self.norm_expected_prizes = self.expected_prizes / self.required_prize # Normalize actual prizes according to required prize if self.actual_prizes is not None: self.norm_actual_prizes = self.actual_prizes / self.required_prize # Set Solution if Provided if sol is not None: if ref: self.ref_sol = sol self._check_ref_sol_dim() else: self.sol = sol self._check_sol_dim() # Normalize Depots and Points if Required if normalize: self._normalize_depots_and_points_with_penalties() # Set Number of Nodes if Provided if self.points is not None: self.nodes_num = self.points.shape[0] # Set Name if Provided if name is not None: self.name = name
[docs] def check_constraints(self, sol: np.ndarray) -> bool: """Check if the solution is valid.""" # Every tour starts and ends with the depot if sol[0] != 0 or sol[-1] != 0: return False # The total collected prize should be at least the required prize collected_norm_prizes = np.sum(self.norm_actual_prizes[sol[1:-1]-1]) if collected_norm_prizes < 1 - self.threshold: # for floating point precision return False return True
[docs] def evaluate(self, sol: np.ndarray, check_constr: bool = True) -> np.floating: """Evaluate the total cost of the SPCTSP solution.""" # Check Constraints if check_constr and not self.check_constraints(sol): raise ValueError("Invalid solution!") # Calculate total travel distance total_distance = 0 for i in range(len(sol) - 1): total_distance += self.dist_eval.cal_distance( self.coords[sol[i]], self.coords[sol[i + 1]] ) # Calculate total penalty for unvisited nodes mask = np.ones(self.nodes_num, dtype=np.bool_) mask[sol[1:-1] - 1] = False total_penalty = np.sum(self.penalties[mask]) return total_distance + total_penalty
[docs] def render( self, save_path: pathlib.Path, with_sol: bool = True, figsize: tuple = (5, 5), node_color: str = "darkblue", edge_color: str = "darkblue", node_size: int = 50, ): """Render the SPCTSP problem instance with or without solution.""" raise NotImplementedError("Render is not implemented for SPCTSP.")