Source code for ml4co_kit.task.routing.vrp.cvrp

r"""
Capacitated Vehicle Routing Problem (CVRP).

Find minimum-cost vehicle routes from a depot that serve all customers exactly once
subject to vehicle capacity. :class:`CVRPTask` holds one instance; use
:class:`~ml4co_kit.wrapper.routing.vrp.cvrp.CVRPWrapper` for batch operations.
"""

# Copyright (c) 2024 Thinklab@SJTU
# ML4CO-Kit is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.


import vrplib
import pathlib
import numpy as np
import matplotlib.pyplot as plt
from typing import Union
from ml4co_kit.task.base import TASK_TYPE
from ml4co_kit.utils.file_utils import check_file_path
from ml4co_kit.task.routing.base import DistanceEvaluator, RoutingTaskBase, DISTANCE_TYPE, ROUND_TYPE


[docs]class CVRPTask(RoutingTaskBase): """Single-instance CVRP task. Parameters ---------- cvrp_open : bool, optional If ``True``, routes are open (OVRP): no return leg to the depot. mixed_backhaul : bool, optional Reserved for backhaul variants (unused in plain CVRP). distance_type : DISTANCE_TYPE, optional Edge metric. Default ``EUC_2D``. round_type : ROUND_TYPE, optional Distance rounding rule. Default ``NO``. precision : np.float32 or np.float64, optional Coordinate / cost dtype. threshold : float, optional Numerical tolerance for constraint checking. Notes ----- Solution format: 1D array with depot index ``0`` as route separators, e.g. ``[0, 3, 5, 0, 2, 1, 0]`` for two routes. Examples -------- >>> import pathlib >>> from ml4co_kit import CVRPTask >>> task = CVRPTask() >>> task.from_pickle( ... pathlib.Path("test_dataset/routing/vrp/cvrp/task/cvrp50_uniform_task.pkl") ... ) >>> float(task.evaluate(task.ref_sol)) 10.973... """ def __init__( self, cvrp_open: bool = False, mixed_backhaul: bool = False, distance_type: DISTANCE_TYPE = DISTANCE_TYPE.EUC_2D, round_type: ROUND_TYPE = ROUND_TYPE.NO, precision: Union[np.float32, np.float64] = np.float32, threshold: float = 1e-4 ): # Super Initialization super(CVRPTask, self).__init__( task_type=TASK_TYPE.CVRP, minimize=True, distance_type=distance_type, round_type=round_type, precision=precision ) # Initialize Attributes self.cvrp_open = cvrp_open # Open Route self.nodes_num = None # Number of nodes (besides depots) self.depots = None # Coordinates of depots (1, 2) self.points = None # Coordinates of points (V, 2) self.coords = None # All coordinates (V+1, 2) self.demands = None # Demands of points (V,) self.norm_demands = None # Normalized demands of points (V,) self.capacity = None # Capacity of vehicles self.dists = None # Distance matrix (V+1, V+1) self.threshold = threshold # Threshold for floating point precision def _normalize_data(self): """Normalize depots and points to [0, 1] range.""" # Check if the distance type is EUC_2D if self.dist_eval.distance_type != DISTANCE_TYPE.EUC_2D: raise ValueError("Normalization is only supported for EUC_2D distance type.") # Save the original data in cache self.cache["raw_points"] = self.points self.cache["raw_depots"] = self.depots self.cache["raw_coords"] = self.coords # Get normalize scale min_vals = np.min(self.coords) max_vals = np.max(self.coords) norm_scale = max_vals - min_vals # Normalize the data self.coords = (self.coords - min_vals) / norm_scale self.depots = self.coords[0] self.points = self.coords[1:] # Clean the dists self.dists = None def _restore_raw_data(self): """Restore the original data from cache.""" self.points = self.cache.get("raw_points", self.points) self.depots = self.cache.get("raw_depots", self.depots) self.coords = self.cache.get("raw_coords", self.coords) def _check_depots_dim(self): """Check if depots are 1D or 2D.""" if self.depots.ndim != 1 or self.depots.shape[0] not in [2, 3]: raise ValueError( "Depots should be a 1D array with shape (2,) or (3,)." ) def _check_depots_not_none(self): """Check if depots are not None.""" if self.depots is None: raise ValueError("``depots`` cannot be None!") def _check_points_dim(self): """Check if points are 2D or 3D.""" if self.points.ndim != 2 or self.points.shape[1] not in [2, 3]: raise ValueError( "Points should be a 2D array with shape (num_points, 2) or (num_points, 3)." ) def _check_points_not_none(self): """Check if points are not None.""" if self.points is None: raise ValueError("``points`` cannot be None!") def _check_coords_not_none(self): """Check if coords are not None.""" if self.coords is None: raise ValueError( "``coords`` cannot be None! This attribute is generated " "automatically when ``depots`` and ``points`` are provided." ) def _check_demands_dim(self): """Ensure demands is a 1D array.""" if self.demands.ndim != 1: raise ValueError("Demands should be a 1D array.") def _check_demands_not_none(self): """Check if demands are not None.""" if self.demands is None: raise ValueError("``demands`` cannot be None!") def _check_norm_demands_not_none(self): """Check if norm_demands are not None.""" if self.norm_demands is None: raise ValueError( "``norm_demands`` cannot be None! This attribute is generated " "automatically when ``demands`` and ``capacity`` are provided." ) def _check_capacity_not_none(self): """Check if capacity is not None.""" if self.capacity is None: raise ValueError("``capacity`` cannot be None!") def _check_sol_dim(self): """Ensure solution is a 1D array.""" if self.sol.ndim != 1: raise ValueError("Solution should be a 1D array.") def _check_ref_sol_dim(self): """Ensure reference solution is a 1D array.""" if self.ref_sol.ndim != 1: raise ValueError("Reference solution should be a 1D array.") def _get_dists(self) -> np.ndarray: """Get distance matrix.""" if self.dists is None: dists = self.dist_eval.cal_dist_matrix(self.coords) self.dists = dists.astype(self.precision) return self.dists def _split_tours(self, sol: np.ndarray) -> list: """Split tours based on depot delimiters.""" return np.split(sol, np.where(sol == 0)[0])[1: -1] @staticmethod def _check_route_c( route: np.ndarray, demands: np.ndarray, capacity: float, threshold: float, ) -> bool: # Get the demands on the route route_demands = demands[route.astype(int) - 1] split_demand_need = np.sum(route_demands) # Check if the demand is within the capacity if split_demand_need > capacity + threshold: return False else: return True @staticmethod def _check_route_l( dist_eval: DistanceEvaluator, coords: np.ndarray, route: np.ndarray, max_route_length: float, threshold: float, cvrp_open: bool = False, ) -> bool: """ Define a helper function to check the constraint L """ # Initialize the route length route_length = 0 # Depot -> First Customer route_length += dist_eval.cal_distance( start=coords[0], end=coords[route[0]] ) # Customers in the route for idx in range(len(route) - 1): route_length += dist_eval.cal_distance( start=coords[route[idx]], end=coords[route[idx + 1]] ) # If not cvrp_open, add the distance from the last customer to the depot if not cvrp_open: route_length += dist_eval.cal_distance( start=coords[route[-1]], end=coords[0] ) # Check if the route length is within the maximum route length if route_length > max_route_length + threshold: return False else: return True @staticmethod def _check_route_b( route: np.ndarray, demands: np.ndarray, capacity: float, threshold: float, ) -> bool: """ Define a helper function to check the constraint B """ # 1. Linehaul before backhaul route_demands = demands[route.astype(int) - 1] is_linehaul = route_demands >= 0 is_backhaul = route_demands < 0 if np.any(is_backhaul): first_backhaul_idx = np.flatnonzero(is_backhaul)[0] if np.any(is_linehaul[first_backhaul_idx + 1:]): return False # 2. Linehaul load <= vehicle capacity linehaul_load = np.sum(route_demands[is_linehaul]) if linehaul_load > capacity + threshold: return False # 3. Backhaul load <= vehicle capacity backhaul_load = np.sum(-route_demands[is_backhaul]) if backhaul_load > capacity + threshold: return False # Return True if the route constraints are satisfied return True @staticmethod def _check_route_mb( route: np.ndarray, demands: np.ndarray, capacity: float, threshold: float, ) -> bool: """ Define a helper function to check constraint MB: """ # 1. Linehaul load <= vehicle capacity route = route.astype(int) - 1 route_demands = demands[route] is_linehaul = route_demands >= 0 linehaul_load = np.sum(route_demands[is_linehaul]) if linehaul_load > capacity + threshold: return False # 2. Check if the backhaul load is within the capacity left_capacity = capacity - linehaul_load for node in route: # Update the left capacity left_capacity += demands[node] # Check if the left capacity is greater than zero if left_capacity < -threshold: return False # Return True if the route constraints are satisfied return True @staticmethod def _check_route_tw( dist_eval: DistanceEvaluator, coords: np.ndarray, route: np.ndarray, tw: np.ndarray, service: np.ndarray, threshold: float, cvrp_open: bool = False ) -> bool: """ Define a helper function to check the constraint TW """ # Initialize the current time cur_time: float = 0 last_node = 0 # Sequentially check each node in the route for node in route.astype(int): # Get the time window and service time cur_tw = tw[node] cur_tw_0 = float(cur_tw[0]) cur_tw_1 = float(cur_tw[1]) cur_st = service[node] # Update the current time travel_time = dist_eval.cal_distance( start=coords[last_node], end=coords[node] ) cur_time += float(travel_time) cur_time = cur_tw_0 if cur_time < cur_tw_0 else cur_time # Check if the current time is within the time window if cur_time > cur_tw_1 + threshold: return False # Update the current time with the service time cur_time += cur_st # Update the last node last_node = node # If not open, final check (back to depot) if not cvrp_open: # Get the time window and service time cur_tw: np.ndarray = tw[0] cur_tw_1: float = float(cur_tw[1]) # Update the current time travel_time = dist_eval.cal_distance( start=coords[last_node], end=coords[0] ) cur_time += float(travel_time) # Check if the current time is within the time window if cur_time > cur_tw_1 + threshold: return False # Return True if the route time windows are satisfied return True
[docs] def from_data( self, depots: np.ndarray = None, points: np.ndarray = None, demands: np.ndarray = None, capacity: float = None, sol: np.ndarray = None, ref: bool = False, normalize: bool = False, name: str = None ): """Populate the task from numpy arrays. Parameters ---------- depots : np.ndarray, optional Depot coordinates, shape ``(2,)`` or ``(3,)``. points : np.ndarray, optional Customer coordinates, shape ``(V, 2)`` or ``(V, 3)``. demands : np.ndarray, optional Customer demands, shape ``(V,)``. capacity : float, optional Vehicle capacity. sol : np.ndarray, optional Tour encoding with ``0`` depot delimiters. ref : bool, optional If ``True``, store ``sol`` in ``ref_sol`` instead of ``sol``. normalize : bool, optional Scale coordinates to ``[0, 1]`` (``EUC_2D`` only). name : str, optional Instance name override. """ # Set Attributes and Check Dimensions if depots is not None: self.depots = depots.astype(self.precision) self._check_depots_dim() if points is not None: self.points = points.astype(self.precision) self._check_points_dim() if demands is not None: self.demands = demands.astype(self.precision) self._check_demands_dim() if capacity is not None: self.capacity = capacity # Merge depots and points if self.depots is not None and self.points is not None: self.coords = np.concatenate( [np.expand_dims(self.depots, axis=0), self.points], axis=0 ) # Normalize demands accroding to capacity if self.demands is not None and self.capacity is not None: self.norm_demands = self.demands / self.capacity # Set Solution if Provided if sol is not None: if ref: self.ref_sol = sol self._check_ref_sol_dim() else: self.sol = sol self._check_sol_dim() # Normalize Depots and Points if Required if normalize: self._normalize_data() # Set Number of Nodes if Provided if self.points is not None: self.nodes_num = self.points.shape[0] # Set Name if Provided if name is not None: self.name = name
[docs] def from_vrplib( self, vrp_file_path: pathlib.Path = None, sol_file_path: pathlib.Path = None, ref: bool = False, normalize: bool = False ): """Load CVRP data from a VRPLIB file.""" # Read data from TSPLIB file if provided points = depots = demands = capacity = name =None if vrp_file_path is not None: # Load CVRP data from VRPLIB file instance = vrplib.read_instance( path=vrp_file_path, compute_edge_weights=False ) name = instance["name"] coords = instance["node_coord"] depots = coords[0] points = coords[1:] capacity = instance["capacity"] demands = instance["demand"][1:] # Read solution from sol file if provided sol = None if sol_file_path is not None: route_flag = None with open(sol_file_path, "r") as file: first_line = file.readline() if "Route" in first_line: # Like this # Route #1: 15 17 9 3 16 29 # Route #2: 12 5 26 7 8 13 32 2 route_flag = True else: # Like this # 36395 # 37 # 1893900 # 1133620 # 0 1 1 1144 12 14 0 217 236 105 2 169 8 311 434 362 187 136 59 0 # 0 1 1 1182 12 14 0 3 370 133 425 349 223 299 386 267 410 411 348 0 route_flag = False # Deal with different cases if route_flag == True: with open(sol_file_path, "r") as file: tour = [0] for line in file: if line.startswith("Route"): split_line = line.split(" ")[2:] for node in split_line: if node != "\n": tour.append(int(node)) tour.append(0) elif route_flag == False: with open(sol_file_path, "r") as file: line_idx = 0 tour = list() for line in file: line_idx += 1 if line_idx < 5: continue split_line = line.split(" ")[7:-1][1:] for node in split_line: tour.append(int(node)) tour.append(0) else: raise ValueError( f"Unable to read route information from {sol_file_path}." ) sol = np.array(tour) # Use ``from_data`` self.from_data( depots=depots, points=points, demands=demands, capacity=capacity, sol=sol, ref=ref, normalize=normalize, name=name )
[docs] def to_vrplib( self, vrp_file_path: pathlib.Path = None, sol_file_path: pathlib.Path = None ): """Save CVRP data to a VRPLIB file.""" # Save CVRP data to a VRPLIB file if vrp_file_path is not None: # Check data self._check_depots_not_none() self._check_points_not_none() self._check_demands_not_none() self._check_capacity_not_none() depots = self.depots points = self.points demands = self.demands capacity = self.capacity # Check file path check_file_path(vrp_file_path) # Write CVRP data to a VRPLIB file with open(vrp_file_path, "w") as f: f.write(f"NAME : {vrp_file_path}\n") f.write(f"COMMENT : Generated by ML4CO-Kit\n") f.write("TYPE : CVRP\n") f.write(f"DIMENSION : {self.nodes_num + 1}\n") f.write(f"EDGE_WEIGHT_TYPE : {self.distance_type.value}\n") f.write(f"CAPACITY : {capacity}\n") f.write("NODE_COORD_SECTION\n") x, y = depots f.write(f"1 {x} {y}\n") for i in range(self.nodes_num): x, y = points[i] f.write(f"{i+2} {x} {y}\n") f.write("DEMAND_SECTION \n") f.write(f"1 0\n") for i in range(self.nodes_num): f.write(f"{i+2} {demands[i]}\n") f.write("DEPOT_SECTION \n") f.write(" 1\n") f.write(" -1\n") f.write("EOF\n") # Save Solution to a sol file if sol_file_path is not None: # Check data self._check_sol_not_none() sol = self.sol # Check file path check_file_path(sol_file_path) # Use ``evaluate`` to get cost cost = self.evaluate(sol) # Write Solution to a sol file split_tours = self._split_tours(sol) with open(sol_file_path, "w") as f: for i in range(len(split_tours)): part_tour = split_tours[i][1:] f.write(f"Route #{i+1}: ") f.write(f" ".join(str(int(node)) for node in part_tour)) f.write("\n") f.write(f"Cost {cost}\n")
[docs] def check_constraints(self, sol: np.ndarray) -> bool: """Return ``True`` if ``sol`` is a feasible CVRP tour.""" # Every tour starts and ends with the depot if sol[0] != 0 or sol[-1] != 0: return False # Every node is visited exactly once ordered_sol = np.sort(sol)[-self.nodes_num:] if not np.all(ordered_sol == (np.arange(self.nodes_num) + 1)): return False # Demands Constraint demands = self.demands capacity = self.capacity split_tours = self._split_tours(sol) # For each split tour, check: # if the demand is within the capacity for split_idx in range(len(split_tours)): # Get the split tour and the demands on the split tour split_tour: np.ndarray = split_tours[split_idx][1:] # check if the demand is within the capacity if not self._check_route_c( route=split_tour, demands=demands, capacity=capacity, threshold=self.threshold ): return False # If all constraints are satisfied, return True return True
[docs] def evaluate(self, sol: np.ndarray, check_constr: bool = True) -> np.floating: """Return total route length of ``sol``. Parameters ---------- sol : np.ndarray Tour with depot index ``0`` as route separators. check_constr : bool, optional Raise ``ValueError`` if constraints are violated. Returns ------- np.floating Sum of edge lengths (respects ``cvrp_open``). """ # Check Constraints if check_constr and not self.check_constraints(sol): raise ValueError("Invalid solution!") # Evaluate total_distance = 0 for i in range(len(sol) - 1): if self.cvrp_open: # Skip the closing edge from the last customer # back to the depot in the sub route if sol[i] != 0 and sol[i + 1] == 0: continue cost = self.dist_eval.cal_distance( self.coords[sol[i]], self.coords[sol[i + 1]] ) total_distance += np.array(cost).astype(self.precision) return total_distance
[docs] def render( self, save_path: pathlib.Path, with_sol: bool = True, figsize: tuple = (5, 5), node_color: str = "darkblue", edge_color: str = "darkblue", node_size: int = 50, ): """Save a matplotlib figure of the instance (and optional solution). Parameters ---------- save_path : pathlib.Path Output image path (``.png``, etc.). with_sol : bool, optional Draw ``self.sol`` routes when ``True``; instance only when ``False``. figsize : tuple, optional Figure size in inches. node_color, edge_color : str, optional Plot colors (used when applicable). node_size : int, optional Scatter marker size for customers. """ # Check ``save_path`` check_file_path(save_path) # Get Attributes depots = self.depots points = self.points sol = self.sol # Draw Graph if with_sol: # Draw Depots _, ax = plt.subplots(figsize=figsize) kwargs = dict(c="tab:red", marker="*", zorder=3, s=500) ax.scatter(depots[0], depots[1], label="Depot", **kwargs) # Draw Points coords = np.concatenate([np.expand_dims(depots, axis=0), points], axis=0) x_coords = coords[:, 0] y_coords = coords[:, 1] # Draw Tours split_tours = self._split_tours(sol) idx = 0 for part_tour in split_tours: route = part_tour[1:] x = x_coords[route] y = y_coords[route] # Coordinates of clients served by this route. if len(route) == 1: ax.scatter(x, y, label=f"Route {idx}", zorder=3, s=node_size) ax.plot(x, y) arrowprops = dict(arrowstyle='->', linewidth=0.25, color='grey') ax.annotate( text='', xy=(x_coords[0], y_coords[0]), xytext=(x[0], y[0]), arrowprops=arrowprops ) ax.annotate( text='', xy=(x[-1], y[-1]), xytext=(x_coords[0], y_coords[0]), arrowprops=arrowprops ) ax.set_aspect("equal", "datalim") ax.legend(frameon=False, ncol=2) else: _, ax = plt.subplots(figsize=figsize) kwargs = dict(c="tab:red", marker="*", zorder=3, s=500) ax.scatter(depots[0], depots[1], label="Depot", **kwargs) ax.scatter(points[:, 0], points[:, 1], s=node_size, label="Clients") ax.grid(color="grey", linestyle="solid", linewidth=0.2) ax.set_aspect("equal", "datalim") ax.legend(frameon=False, ncol=2) # Save Figure plt.savefig(save_path)