Source code for optiwindnet.repair

# SPDX-License-Identifier: MIT
# https://gitlab.windenergy.dtu.dk/TOPFARM/OptiWindNet/

import logging

import networkx as nx

from .crossings import list_edge_crossings
from .interarraylib import calcload

__all__ = ('repair_routeset_path',)

_lggr = logging.getLogger(__name__)
info, warn = _lggr.info, _lggr.warning


def gate_and_leaf_path(S: nx.Graph, n: int) -> tuple[int, int]:
    """
    `S` has loads, is a rootless subgraph_view and non-branching
    """
    # non-branching graphs only, gates and detours removed
    if S.degree[n] == 2:
        u, v = S[n]
        head, tail = (u, v) if S.nodes[u]['load'] > S.nodes[v]['load'] else (v, u)
        # go towards the gate
        gate_leaf = []
        for fwd, back in ((head, tail), (tail, head)):
            while S.degree[fwd] == 2:
                s, t = S[fwd]
                fwd, back = (s, fwd) if t == back else (t, fwd)
            gate_leaf.append(fwd)
        return tuple(gate_leaf)
    else:
        if S.nodes[n]['load'] == 1:
            leaf = back = n
            gate = None
        else:
            gate = back = n
            leaf = n
        (fwd,) = S[n]
        while S.degree[fwd] == 2:
            s, t = S[fwd]
            fwd, back = (s, fwd) if t == back else (t, fwd)
        if gate is None:
            gate_leaf = (fwd, leaf)
        else:
            gate_leaf = (gate, fwd)
        return gate_leaf


def list_path(S: nx.Graph, n: int) -> list[int]:
    """
    `S` has loads, no gate or detour edges
    all subtrees of `S` are paths
    `n` must be an extremity of the path
    """
    path = [n]
    (far,) = S[n]
    while S.degree[far] == 2:
        s, t = S[far]
        far, n = (s, far) if t == n else (t, far)
        path.append(n)
    path.append(far)
    return path if S.nodes[far]['load'] == 1 else path[::-1]


def _find_fix_choices_path(
    A: nx.Graph, swapS: int, src_path: list[int], dst_path: list[int]
) -> list[tuple]:
    # this is named «...»_path because we could make a version that allows
    # branching and call it «...»_tree.
    """
    Swap node `swapS` with one of the nodes of `dst_path`. For each swap, try
    all possible point of insertion of `swapS` into `dst_path`.

    how it works:
    - Several node swapping choices are examined within the edges in `A`.
    - A list where each item is a feasible modification package is returned.
    """
    i_end = len(dst_path)
    choices = []
    hookS_cut, hookS_alt = (
        (src_path[1], src_path[-1])
        if swapS == src_path[0]
        else (src_path[-2], src_path[0])
    )
    gateD = dst_path[0]
    edges_del_0 = [(swapS, hookS_cut)]
    for hookS, freeS in ((hookS_cut, hookS_alt), (hookS_alt, hookS_cut)):
        for i, swapD in enumerate(dst_path):
            # loop through nodes of dst_path (to remove)
            if swapD not in A[hookS]:
                # no easy way to link swapD to src path
                continue
            edges_add_0 = [(swapD, hookS)]
            # three cases here regarding dst_path:
            # A) bypass swapD (insert swapS anywhere in dst_path)
            # B) insert swapS in swapD's place (cannot bypass swapD)
            # C) gate or leaf of dst_path as swapD (insert swapS anywhere)
            # D) none of the above is possible
            if 0 < i < i_end - 1:  # mid-path swapD (remove)
                nearD_, farD_ = dst_path[i - 1], dst_path[i + 1]
                edges_del_1 = [(nearD_, swapD), (swapD, farD_)]
                if nearD_ in A[farD_]:
                    # bypassing of swapD is possible
                    edges_add_1 = [(nearD_, farD_)]
                    dst_path_ = dst_path[:i] + dst_path[i + 1 :]
                    # case (A)
                    for nearD, farD in zip(dst_path_[:-1], dst_path_[1:]):
                        # loop through mid-positions in dst_path_ (insert)
                        if nearD in A[swapS] and farD in A[swapS]:
                            # fix found
                            edges_del = edges_del_0 + edges_del_1 + [(nearD, farD)]
                            edges_add = (
                                edges_add_0
                                + edges_add_1
                                + [(nearD, swapS), (swapS, farD)]
                            )
                            choices.append((gateD, swapD, freeS, edges_del, edges_add))
                    for hookD, D_gated in (
                        (dst_path_[0], False),
                        (dst_path_[-1], True),
                    ):
                        # loop through extreme-positions in dst_path_ (extend)
                        if hookD in A[swapS]:
                            # fix found
                            edges_del = edges_del_0 + edges_del_1
                            edges_add = edges_add_0 + edges_add_1 + [(hookD, swapS)]
                            choices.append(
                                (
                                    gateD if D_gated else swapS,
                                    swapD,
                                    freeS,
                                    edges_del,
                                    edges_add,
                                )
                            )
                    if nearD_ in A[swapS] and farD_ in A[swapS]:
                        # case (B) – single insertion position in dst
                        # fix found
                        edges_del = edges_del_0 + edges_del_1
                        edges_add = edges_add_0 + [(nearD_, swapS), (swapS, farD_)]
                        choices.append((gateD, swapD, freeS, edges_del, edges_add))
                    else:
                        # case (D) – nothing to do
                        continue
            else:
                # case (C)
                dst_path_, hookD, D_gated = (
                    (dst_path[1:], dst_path[1], False)
                    if i == 0
                    else (dst_path[:-1], dst_path[-2], True)
                )
                edges_del_1 = [(swapD, hookD)]
                for nearD, farD in zip(dst_path_[:-1], dst_path_[1:]):
                    # loop through mid-positions in dst_path_ (to insert)
                    if nearD in A[swapS] and farD in A[swapS]:
                        # fix found
                        edges_del = edges_del_0 + edges_del_1 + [(nearD, farD)]
                        edges_add = edges_add_0 + [(nearD, swapS), (swapS, farD)]
                        choices.append(
                            (
                                gateD if D_gated else hookD,
                                swapD,
                                freeS,
                                edges_del,
                                edges_add,
                            )
                        )
                if hookD in A[swapS]:
                    # fix found
                    edges_del = edges_del_0 + edges_del_1
                    edges_add = edges_add_0 + [(hookD, swapS)]
                    choices.append(
                        (
                            gateD if D_gated else swapS,
                            swapD,
                            freeS,
                            edges_del,
                            edges_add,
                        )
                    )
    return choices  # ((gateD, swapD, freeS, edges_del, edges_add), ...)


def _quantify_choices(S, A, swapS, src_path, dst_path, choices):
    quant_choices = []
    (rootS,) = (n for n in S[src_path[0]] if n < 0)
    (rootD,) = (n for n in S[dst_path[0]] if n < 0)
    d2roots = A.graph['d2roots']
    for gateD, swapD, freeS, edges_del, edges_add in choices:
        gates_del = []
        gates_add = []
        change = 0
        minSd2root, gateS = min(
            (d2roots[swapD, rootS], swapD),
            (d2roots[freeS, rootS], freeS),
        )
        if swapS == src_path[0]:
            gates_del.append((rootS, swapS))
            change -= d2roots[swapS, rootS]
            gates_add.append((rootS, gateS))
            change += minSd2root
        elif gateS != src_path[0]:
            gates_del.append((rootS, src_path[0]))
            change -= d2roots[src_path[0], rootS]
            gates_add.append((rootS, gateS))
            change += minSd2root
        if gateD != dst_path[0]:
            gates_del.append((rootD, dst_path[0]))
            change -= d2roots[dst_path[0], rootD]
            minDd2root, gateD = min(
                (d2roots[gateD, rootD], gateD),
                (d2roots[dst_path[-1], rootD], dst_path[-1]),
            )
            gates_add.append((rootD, gateD))
            change += minDd2root
        change += sum(A[u][v]['length'] for u, v in edges_add) - sum(
            A[u][v]['length'] for u, v in edges_del
        )
        quant_choices.append((change, (edges_del, edges_add, gates_del, gates_add)))
    return quant_choices


def _apply_choice(
    S: nx.Graph,
    A: nx.Graph,
    edges_del: list[tuple[int, int]],
    edges_add: list[tuple[int, int]],
    gates_del: list[tuple[int, int]],
    gates_add: list[tuple[int, int]],
) -> nx.Graph:
    d2roots = A.graph['d2roots']
    # for edges: add first, then del
    S.add_edges_from(edges_add)
    S.remove_edges_from(edges_del)
    # for gates: del first, then add
    for root, gate in gates_del:
        S.remove_edge(root, gate)
    for root, gate in gates_add:
        S.add_edge(root, gate, length=d2roots[gate, root])
    # the repair invalidates current load attributes -> recalculate them
    # TODO: do it in a smarter way by updating only affected subtrees
    calcload(S)
    return S


[docs] def repair_routeset_path(Sʹ: nx.Graph, A: nx.Graph) -> nx.Graph: # naming: suffix _path as opposed to _tree -> Sʹ is non-branching """ This is only able to repair crossings where one of the paths is split in either a leaf and a path or a hook and a path. Args: Sʹ: solution topology that contains non-branching rooted tree(s) A: available edges used in creating `Sʹ` Returns: Topology without the crossing in a shallow copy of `Sʹ`. """ if 'C' in Sʹ.graph or 'D' in Sʹ.graph: print( 'ERROR: no changes made - `repair_routeset_path()` requires ' '`Sʹ` as a topology.' ) return Sʹ P = A.graph['planar'] diagonals = A.graph['diagonals'] S = Sʹ.copy() def not_crossing(choice): # reads from parent scope: diagonals, S, P # gateD, swapD, freeS, edges_del, edges_add = choice _, _, _, edges_del, edges_add = choice edges_del_ = set((u, v) if u < v else (v, u) for u, v in edges_del) edges_add_ = set((u, v) if u < v else (v, u) for u, v in edges_add) edges_add = edges_add_ - edges_del_ edges_del = edges_del_ - edges_add_ for u, v in edges_add: st = diagonals.get((u, v)) if st is None: # ⟨u, v⟩ is a Delaunay edge, find its diagonal st = diagonals.inv.get((u, v)) if st is None: # ⟨u, v⟩ has no diagonal continue s, t = st if s < 0 or t < 0: # diagonal of ⟨u, v⟩ is a gate continue if ((s, t) in S.edges or (s, t) in edges_add) and ( s, t, ) not in edges_del: # crossing with diagonal return False else: # ⟨u, v⟩ is a diagonal of Delaunay ⟨s, t⟩ s, t = st if ((s, t) in S.edges or (s, t) in edges_add) and ( s, t, ) not in edges_del: # crossing with Delaunay edge return False # TODO: update the code below to use the bidict diagonals # ensure u–s–v–t is ccw u, v = (u, v) if (P[u][t]['cw'] == s and P[v][s]['cw'] == t) else (v, u) # examine the two triangles ⟨s, t⟩ belongs to for a, b, c in ((s, t, u), (t, s, v)): # this is for diagonals crossing diagonals (4 checks) cbD = P[c].get(b) if cbD is not None: d = cbD['ccw'] diag_da = (a, d) if a < d else (d, a) if ( d == P[b][c]['cw'] and (diag_da in S.edges or diag_da in edges_add) and diag_da not in edges_del ): return False acD = P[a].get(c) if acD is not None: e = acD['ccw'] diag_eb = (e, b) if e < b else (b, e) if ( e == P[c][a]['cw'] and (diag_eb in S.edges or diag_eb in edges_add) and diag_eb not in edges_del ): return False return True outstanding_crossings = [] while True: # make a subgraph without gates and detours S_T = nx.subgraph_view(S, filter_node=lambda n: n >= 0) eeXings = list_edge_crossings(S_T, A) done = True for uv, st in eeXings: uv, st = (uv, st) if uv < st else (st, uv) if (uv, st) in outstanding_crossings: continue if all(S_T.degree[n] > 1 for n in (*uv, *st)): # found an unrepairable crossing outstanding_crossings.append((uv, st)) continue u, v = uv s, t = st subtree_uv = S.nodes[u]['subtree'] if subtree_uv == S.nodes[s]['subtree']: # this crossing is within a single subtree -> non-repairable # TODO: implement repair for this case (challenging) outstanding_crossings.append((uv, st)) continue # crossing is potentialy repairable done = False break if done: break gateV, leafV = gate_and_leaf_path(S_T, v) gateT, leafT = gate_and_leaf_path(S_T, t) src_dst_swap = [] if u == gateV or u == leafV: src_dst_swap.append((gateV, gateT, u)) elif v == gateV or v == leafV: src_dst_swap.append((gateV, gateT, v)) if s == gateT or s == leafT: src_dst_swap.append((gateT, gateV, s)) elif t == gateT or t == leafT: src_dst_swap.append((gateT, gateV, t)) if not src_dst_swap: warn( 'Crossing repair is only implemented for the cases where the ' 'split results in at least one single-noded branch fragment.' ) outstanding_crossings.append((uv, st)) continue quant_choices = [] for gateS, gateD, swapS in src_dst_swap: src_path = list_path(S_T, gateS) dst_path = list_path(S_T, gateD) # TODO: remove this # print(dst_path) choices = _find_fix_choices_path(A, swapS, src_path, dst_path) choices = filter(not_crossing, choices) quant_choices.extend( _quantify_choices(S, A, swapS, src_path, dst_path, choices) ) if not quant_choices: info('Repair unsuccessful: crossing marked for removal on HGS rerun.') outstanding_crossings.append((uv, st)) continue quant_choices.sort() # TODO: remove this # for cost, choice_data in quant_choices: # print(f'{cost:.3f} |', '|'.join(str(data) for data in choice_data)) _, choice = quant_choices[0] # apply_choice works on a copy of Sʹ S = _apply_choice(S, A, *choice) S.graph['repaired'] = 'repair_routeset_path' if outstanding_crossings: S.graph['num_crossings'] = len(outstanding_crossings) S.graph['outstanding_crossings'] = outstanding_crossings return S