Source code for optiwindnet.MILP.ortools

# SPDX-License-Identifier: MIT
# https://gitlab.windenergy.dtu.dk/TOPFARM/OptiWindNet/

import logging
import math
from datetime import timedelta
from itertools import chain
from typing import Any

import networkx as nx
from ortools.math_opt.python import mathopt

from ..crossings import edgeset_edgeXing_iter, gateXing_iter
from ..interarraylib import G_from_S, fun_fingerprint
from ..pathfinding import PathFinder
from ._core import (
    FeederLimit,
    FeederRoute,
    ModelMetadata,
    ModelOptions,
    OWNSolutionNotFound,
    OWNWarmupFailed,
    PoolHandler,
    SolutionInfo,
    Solver,
    Topology,
    physical_core_count,
)

__all__ = ('make_min_length_model', 'warmup_model')

_lggr = logging.getLogger(__name__)
error, warn, info = _lggr.error, _lggr.warning, _lggr.info
_SOLVER_TYPES = {
    'cp_sat': mathopt.SolverType.CP_SAT,
    'gscip': mathopt.SolverType.GSCIP,
    'highs': mathopt.SolverType.HIGHS,
}
_CALLBACK_BACKENDS = ('cp_sat',)


class _SolutionStore:
    """Ad hoc callback implementation that stores solutions to a pool."""

    solutions: list[tuple[float, dict]]

    def __init__(self, metadata: ModelMetadata, weight_by_link_id: dict[int, float]):
        self.metadata = metadata
        self.weight_by_link_id = weight_by_link_id
        self.solutions = []

    def on_solution_callback(
        self, cb_data: mathopt.CallbackData
    ) -> mathopt.CallbackResult:
        if cb_data.event is not mathopt.Event.MIP_SOLUTION or cb_data.solution is None:
            return mathopt.CallbackResult()
        solution = {
            var.id: round(cb_data.solution[var]) for var in self.metadata.link_.values()
        }
        solution |= {
            var.id: round(cb_data.solution[var]) for var in self.metadata.flow_.values()
        }
        objective_value = sum(
            weight * solution[var_id]
            for var_id, weight in self.weight_by_link_id.items()
        )
        self.solutions.append((objective_value, solution))
        return mathopt.CallbackResult()


class SolverORTools(Solver, PoolHandler):
    """OR-Tools MathOpt wrapper using the selected backend.

    This class wraps and changes the behavior of MathOpt in order to save all
    solutions found to a pool. Meant to be used with `investigate_pool()`.
    """

    name: str
    backend: str
    log_callback: Any
    _solution_pool: list[tuple[float, dict]]
    _solve_result: mathopt.SolveResult | None

    def __init__(self, backend: str):
        if backend not in _SOLVER_TYPES:
            raise ValueError(f'Unsupported OR-Tools MathOpt backend: {backend}')
        self.backend = backend
        self.name = f'ortools.{backend}'
        self.log_callback = None
        self._solve_result = None
        self.options = {}

    def _link_val(self, var: Any) -> int:
        return self._value_map[var.id]

    def _flow_val(self, var: Any) -> int:
        return self._value_map[var.id]

    def set_problem(
        self,
        P: nx.PlanarEmbedding,
        A: nx.Graph,
        capacity: int,
        model_options: ModelOptions,
        warmstart: nx.Graph | None = None,
    ):
        self.P, self.A, self.capacity = P, A, capacity
        self.model_options = model_options
        model, metadata = make_min_length_model(self.A, self.capacity, **model_options)
        self.model, self.metadata = model, metadata
        if warmstart is not None:
            warmup_model(model, metadata, warmstart)

    def solve(
        self,
        time_limit: float,
        mip_gap: float,
        options: dict[str, Any] = {},
        verbose: bool = False,
    ) -> SolutionInfo:
        """Wrapper for MathOpt solve() that saves all solutions.

        This method uses a MIP_SOLUTION callback to fill a solution pool stored
        in the attribute self.solutions.
        """
        try:
            model = self.model
        except AttributeError as exc:
            exc.args += ('.set_problem() must be called before .solve()',)
            raise
        tracked_vars = tuple(
            chain(self.metadata.link_.values(), self.metadata.flow_.values())
        )
        storer = _SolutionStore(
            self.metadata,
            {
                var.id: weight
                for var, weight in zip(
                    self.metadata.link_.values(), self.metadata.weight_
                )
            },
        )
        applied_options = self.options | options
        self.stopping = dict(mip_gap=mip_gap, time_limit=time_limit)
        solve_params = self._make_solve_parameters(
            time_limit, mip_gap, applied_options, verbose
        )
        info('>>> ORTools %s parameters <<<\n%s\n', self.backend, solve_params)
        model_params = mathopt.ModelSolveParameters(
            variable_values_filter=mathopt.VariableFilter(filtered_items=tracked_vars),
            solution_hints=(
                [mathopt.SolutionHint(variable_values=self.metadata.solution_hint)]
                if self.metadata.solution_hint
                else []
            ),
        )
        solve_kwargs = dict(
            opt_model=model,
            solver_type=_SOLVER_TYPES[self.backend],
            params=solve_params,
            model_params=model_params,
            # HiGHS triggers a deprecation warning when msg_cb is used
            # (setLogCallback → setCallback), so skip it for that backend.
            msg_cb=self._msg_cb
            if self.log_callback is not None and self.backend != 'highs'
            else None,
        )
        if self.backend in _CALLBACK_BACKENDS:
            solve_kwargs['callback_reg'] = mathopt.CallbackRegistration(
                events={mathopt.Event.MIP_SOLUTION},
                mip_solution_filter=mathopt.VariableFilter(filtered_items=tracked_vars),
            )
            solve_kwargs['cb'] = storer.on_solution_callback
        result = mathopt.solve(**solve_kwargs)
        self._solve_result = result
        if len(storer.solutions) == 0 and result.has_primal_feasible_solution():
            solution = {
                var.id: round(value)
                for var, value in zip(
                    tracked_vars, result.variable_values(tracked_vars)
                )
            }
            storer.solutions.append((result.objective_value(), solution))
        num_solutions = len(storer.solutions)
        termination = result.termination.reason.name
        if num_solutions == 0:
            raise OWNSolutionNotFound(
                f'Unable to find a solution. Solver {self.name} terminated'
                f' with: {termination}'
            )
        storer.solutions.reverse()
        self._solution_pool = storer.solutions
        _, self._value_map = storer.solutions[0]
        self.num_solutions = num_solutions
        bound = result.best_objective_bound()
        objective = result.objective_value()
        solution_info = SolutionInfo(
            runtime=result.solve_stats.solve_time.total_seconds(),
            bound=bound,
            objective=objective,
            relgap=1.0 - bound / objective,
            termination=termination,
        )
        self.solution_info, self.applied_options = solution_info, applied_options
        info('>>> Solution <<<\n%s\n', solution_info)
        return solution_info

    def get_solution(self, A: nx.Graph | None = None) -> tuple[nx.Graph, nx.Graph]:
        if A is None:
            A = self.A
        P, model_options = self.P, self.model_options
        if model_options['feeder_route'] is FeederRoute.STRAIGHT:
            S = self._topology_from_mip_pool()
            G = PathFinder(
                G_from_S(S, A),
                P,
                A,
                branched=model_options['topology'] is Topology.BRANCHED,
            ).create_detours()
        else:
            S, G = self._investigate_pool(P, A)
        G.graph.update(self._make_graph_attributes())
        G.graph['solver_details'].update(strategy=self._solver_termination_detail())
        return S, G

    def _objective_at(self, index: int) -> float:
        objective_value, self._value_map = self._solution_pool[index]
        return objective_value

    def _topology_from_mip_pool(self) -> nx.Graph:
        return self._topology_from_mip_sol()

    def _solver_termination_detail(self) -> str:
        if self._solve_result is None:
            return ''
        termination = self._solve_result.termination
        detail = f' ({termination.detail})' if termination.detail else ''
        return f'{termination.reason.name}{detail}'

    def _msg_cb(self, messages: list[str]) -> None:
        for line in messages:
            self.log_callback(line)

    def _make_solve_parameters(
        self,
        time_limit: float,
        mip_gap: float,
        applied_options: dict[str, Any],
        verbose: bool,
    ) -> mathopt.SolveParameters:
        threads = applied_options.pop('threads', None)
        if threads is None and self.backend != 'cp_sat':
            threads = physical_core_count()
        # SolveParameters.threads is only honoured by cp_sat and gscip;
        # other backends need it injected into their own parameter sub-messages.
        solve_params = mathopt.SolveParameters(
            time_limit=timedelta(seconds=time_limit),
            relative_gap_tolerance=mip_gap,
            enable_output=verbose,
            threads=threads if self.backend in ('cp_sat', 'gscip') else None,
        )
        match self.backend:
            case 'cp_sat':
                for key, val in applied_options.items():
                    setattr(solve_params.cp_sat, key, val)
                solve_params.cp_sat.log_search_progress = verbose
            case 'gscip':
                for key, val in applied_options.items():
                    if '/' in key:
                        # SCIP-native parameter (e.g. 'randomization/randomseedshift')
                        # routed into the appropriate typed dict.
                        match val:
                            case bool():
                                solve_params.gscip.bool_params[key] = val
                            case int():
                                solve_params.gscip.int_params[key] = val
                            case float():
                                solve_params.gscip.real_params[key] = val
                            case str() if len(val) == 1:
                                solve_params.gscip.char_params[key] = val
                            case str():
                                solve_params.gscip.string_params[key] = val
                            case _:
                                raise TypeError(
                                    f'Unsupported type {type(val).__name__}'
                                    f' for GSCIP parameter {key!r}'
                                )
                    else:
                        setattr(solve_params.gscip, key, val)
            case 'highs':
                solve_params.highs.int_options['threads'] = threads
                for key, val in applied_options.items():
                    setattr(solve_params.highs, key, val)
            case _:
                raise ValueError(
                    f'Unsupported OR-Tools MathOpt backend: {self.backend}'
                )
        return solve_params


[docs] def make_min_length_model( A: nx.Graph, capacity: int, *, topology: Topology = Topology.BRANCHED, feeder_route: FeederRoute = FeederRoute.SEGMENTED, feeder_limit: FeederLimit = FeederLimit.UNLIMITED, balanced: bool = False, max_feeders: int = 0, ) -> tuple[mathopt.Model, ModelMetadata]: """Make discrete optimization model over link set A. Build OR-tools CP-SAT model for the collector system length minimization. Args: A: graph with the available edges to choose from capacity: maximum link flow capacity topology: one of Topology.{BRANCHED, RADIAL} feeder_route: FeederRoute.SEGMENTED -> feeder routes may be detoured around subtrees; FeederRoute.STRAIGHT -> feeder routes must be straight, direct lines feeder_limit: one of FeederLimit.{MINIMUM, UNLIMITED, SPECIFIED, MIN_PLUS1, MIN_PLUS2, MIN_PLUS3} max_feeders: only used if feeder_limit is FeederLimit.SPECIFIED """ R = A.graph['R'] T = A.graph['T'] d2roots = A.graph['d2roots'] A_terminals = nx.subgraph_view(A, filter_node=lambda n: n >= 0) W = sum(w for _, w in A_terminals.nodes(data='power', default=1)) # Sets _T = range(T) _R = range(-R, 0) E = tuple(((u, v) if u < v else (v, u)) for u, v in A_terminals.edges()) # using directed node-node links -> create the reversed tuples Eʹ = tuple((v, u) for u, v in E) # set of feeders to all roots stars = tuple((t, r) for r in _R for t in _T) linkset = E + Eʹ + stars # Create model m = mathopt.Model(name='optiwindnet') ############## # Parameters # ############## k = capacity weight_ = 2 * tuple(A[u][v]['length'] for u, v in E) + tuple( chain(*(d2roots[:T, r].tolist() for r in _R)) ) ############# # Variables # ############# link_ = { (u, v): m.add_binary_variable(name=f'link_{u}~{v}') for u, v in chain(E, Eʹ) } link_ |= {(t, r): m.add_binary_variable(name=f'link_{t}~r{-r}') for t, r in stars} flow_ = { (u, v): m.add_integer_variable(lb=0, ub=k - 1, name=f'flow_{u}~{v}') for u, v in chain(E, Eʹ) } flow_ |= { (t, r): m.add_integer_variable(lb=0, ub=k, name=f'flow_{t}~r{-r}') for t, r in stars } ############### # Constraints # ############### # total number of edges must be equal to number of terminal nodes m.add_linear_constraint(sum(link_.values()) == T, name='num_links_eq_T') # enforce a single directed edge between each node pair for u, v in E: m.add_linear_constraint( link_[(u, v)] + link_[(v, u)] <= 1, name=f'single_dir_link_{u}~{v}', ) # feeder-edge crossings if feeder_route is FeederRoute.STRAIGHT: for (u, v), (r, t) in gateXing_iter(A): if u >= 0: m.add_linear_constraint( link_[(u, v)] + link_[(v, u)] + link_[t, r] <= 1, name=f'feeder_link_cross_{u}~{v}_{t}~r{-r}', ) else: # a feeder crossing another feeder (possible in multi-root instances) m.add_linear_constraint( link_[(u, v)] + link_[t, r] <= 1, name=f'feeder_feeder_cross_r{-u}~{v}_{t}~r{-r}', ) # edge-edge crossings for Xing in edgeset_edgeXing_iter(A.graph['diagonals']): m.add_linear_constraint( sum(link_[a, b] for u, v in Xing for a, b in ((u, v), (v, u))) <= 1, name=f'link_link_cross_{"_".join(f"{u}~{v}" for u, v in Xing)}', ) # bind flow to link activation for t, n in linkset: _n = str(n) if n >= 0 else f'r{-n}' m.add_linear_constraint( expr=flow_[t, n] - (k if n < 0 else (k - 1)) * link_[t, n], ub=0, name=f'flow_zero_{t}~{_n}', ) m.add_linear_constraint( expr=flow_[t, n] - link_[t, n], lb=0, name=f'flow_nonzero_{t}~{_n}', ) # flow conservation with possibly non-unitary node power for t in _T: m.add_linear_constraint( sum((flow_[t, n] - flow_[n, t]) for n in A_terminals.neighbors(t)) + sum(flow_[t, r] for r in _R) == A.nodes[t].get('power', 1), name=f'flow_conserv_{t}', ) # feeder limits min_feeders = math.ceil(T / k) all_feeder_vars_sum = sum(link_[t, r] for r in _R for t in _T) if feeder_limit is FeederLimit.UNLIMITED: # valid inequality: number of feeders is at least the minimum m.add_linear_constraint( all_feeder_vars_sum >= min_feeders, name='feeder_limit_lb' ) if balanced: warn( 'Model option <balanced = True> is incompatible with <feeder_limit' ' = UNLIMITED>: model will not enforce balanced subtrees.' ) else: is_equal_not_range = False if feeder_limit is FeederLimit.SPECIFIED: if max_feeders == min_feeders: is_equal_not_range = True elif max_feeders < min_feeders: raise ValueError('max_feeders is below the minimum necessary') elif feeder_limit is FeederLimit.MINIMUM: is_equal_not_range = True elif feeder_limit is FeederLimit.MIN_PLUS1: max_feeders = min_feeders + 1 elif feeder_limit is FeederLimit.MIN_PLUS2: max_feeders = min_feeders + 2 elif feeder_limit is FeederLimit.MIN_PLUS3: max_feeders = min_feeders + 3 else: raise NotImplementedError('Unknown value:', feeder_limit) if is_equal_not_range: m.add_linear_constraint( all_feeder_vars_sum == min_feeders, name='feeder_limit_eq' ) else: m.add_linear_constraint( expr=all_feeder_vars_sum, lb=min_feeders, ub=max_feeders, name='feeder_limit_interval', ) # enforce balanced subtrees (subtree loads differ at most by one unit) if balanced: if is_equal_not_range: feeder_min_load = T // min_feeders if feeder_min_load < capacity: for t, r in stars: m.add_linear_constraint( flow_[t, r] >= link_[t, r] * feeder_min_load, name=f'balanced_{t}~r{-r}', ) else: warn( 'Model option <balanced = True> is incompatible with ' 'having a range of possible feeder counts: model will ' 'not enforce balanced subtrees.' ) # radial or branched topology if topology is Topology.RADIAL: for t in _T: m.add_linear_constraint( expr=sum(link_[n, t] for n in A_terminals.neighbors(t)), ub=1, name=f'radial_{t}', ) # assert all nodes are connected to some root m.add_linear_constraint( sum(flow_[t, r] for r in _R for t in _T) == W, name='total_power_sank' ) # valid inequalities for t in _T: # incoming flow limit m.add_linear_constraint( expr=sum(flow_[n, t] for n in A_terminals.neighbors(t)), ub=k - A.nodes[t].get('power', 1), name=f'inflow_limit_{t}', ) # only one out-edge per terminal m.add_linear_constraint( sum(link_[t, n] for n in chain(A_terminals.neighbors(t), _R)) == 1, name=f'single_out_link_{t}', ) ############# # Objective # ############# m.minimize(sum(weight * var for weight, var in zip(weight_, link_.values()))) ################## # Store metadata # ################## model_options = dict( topology=topology, feeder_route=feeder_route, feeder_limit=feeder_limit, max_feeders=max_feeders, balanced=balanced, ) metadata = ModelMetadata( R, T, k, linkset, link_, flow_, model_options, _make_min_length_model_fingerprint, weight_=weight_, ) return m, metadata
_make_min_length_model_fingerprint = fun_fingerprint(make_min_length_model)
[docs] def warmup_model( model: mathopt.Model, metadata: ModelMetadata, S: nx.Graph ) -> mathopt.Model: """Set initial solution into `model`. Changes `model` and `metadata` in-place. Args: model: CP-SAT model to apply the solution to. metadata: indices to the model's variables. S: solution topology Returns: The same model instance that was provided, now with a solution. Raises: OWNWarmupFailed: if some link in S is not available in model. """ R, T = metadata.R, metadata.T in_S_not_in_model = S.edges - metadata.link_.keys() in_S_not_in_model -= {(v, u) for u, v in metadata.linkset[-R * T :]} if in_S_not_in_model: raise OWNWarmupFailed( f'warmup_model() failed: model lacks S links ({in_S_not_in_model})' ) hint_values = {} for u, v in metadata.linkset[: (len(metadata.linkset) - R * T) // 2]: edgeD = S.edges.get((u, v)) if edgeD is None: hint_values[metadata.link_[u, v]] = 0 hint_values[metadata.flow_[u, v]] = 0 hint_values[metadata.link_[v, u]] = 0 hint_values[metadata.flow_[v, u]] = 0 else: u, v = (u, v) if ((u < v) == edgeD['reverse']) else (v, u) hint_values[metadata.link_[u, v]] = 1 hint_values[metadata.flow_[u, v]] = edgeD['load'] hint_values[metadata.link_[v, u]] = 0 hint_values[metadata.flow_[v, u]] = 0 for t, r in metadata.linkset[-R * T :]: edgeD = S.edges.get((t, r)) hint_values[metadata.link_[t, r]] = 0 if edgeD is None else 1 hint_values[metadata.flow_[t, r]] = 0 if edgeD is None else edgeD['load'] metadata.solution_hint = hint_values metadata.warmed_by = S.graph['creator'] return model