Source code for optiwindnet.interface

# SPDX-License-Identifier: MIT
# https://gitlab.windenergy.dtu.dk/TOPFARM/OptiWindNet/

from functools import partial

import networkx as nx
import numpy as np
import numpy.lib.recfunctions as nprec

from .heuristics import CPEW, NBEW, OBEW
from .interarraylib import F, calcload

__all__ = ('assign_cables',)

heuristics = {
    'CPEW': CPEW,
    'NBEW': NBEW,
    'OBEW': OBEW,
    'OBEW_0.6': partial(OBEW, rootlust='0.6*cur_capacity/capacity'),
}


def translate2global_optimizer(G):
    VertexC = G.graph['VertexC']
    R = G.graph['R']
    T = G.graph['T']
    X, Y = np.hstack((VertexC[-1 : -1 - R : -1].T, VertexC[:T].T))
    return dict(WTc=T, OSSc=R, X=X, Y=Y)


[docs] def assign_cables(G, cables): """Assign a cable type to each edge of `G` and update attribute 'cost'. Each edge is assigned the cheapest cable type that can carry its load. The edge attribute 'cable' is the index in `cables` of the type chosen. Args: G: networkx graph with edges having a 'load' attribute (use calcload(G)) cables: [(«capacity», «cost»), ...] in increasing capacity order (each cable entry must be a tuple) or numpy.ndarray where each row represents one cable type """ Nc = len(cables) dt = np.dtype([('capacity', int), ('cost', float)]) if isinstance(cables, np.ndarray): cable_ = nprec.unstructured_to_structured(cables, dtype=dt) else: cable_ = np.fromiter(cables, dtype=dt, count=Nc) capacity_ = cable_['capacity'] capacity = 1 # for e, data in G.edges.items(): for u, v, data in G.edges(data=True): i = capacity_.searchsorted(data['load']).item() if i >= len(capacity_): print( f'ERROR: Load for edge ⟨{u, v}⟩: {data["load"]} ' f'exceeds maximum cable capacity {capacity_[-1]}.' ) data['cable'] = i data['cost'] = data['length'] * cable_['cost'][i].item() if data['load'] > capacity: capacity = data['load'] G.graph['cables'] = cable_ G.graph['has_costs'] = True if 'capacity' not in G.graph: G.graph['capacity'] = capacity
def assign_subtree(G): start = 0 queue = [] for root in range(-G.graph['R'], 0): for subtree, gate in enumerate(G[root], start=start): queue.append((root, gate)) while queue: parent, node = queue.pop() G.nodes[node]['subtree'] = subtree for nbr in G[node]: if nbr != parent: queue.append((node, nbr)) start = subtree + 1 def L_from_XYR(X, Y, R=1, name='unnamed', borderC=None): """Create location graph L from node coordinates split in X and Y. This function assumes that the first R coordinates in X/Y are OSSs. Args: X: x coordinates of nodes Y: y coordinates of nodes R: number of OSSs Returns: Location graph L. """ assert len(X) == len(Y), 'ERROR: X and Y lengths must match' T = len(X) - R # create networkx graph if borderC is None: borderC = np.array( ((min(X), min(Y)), (min(X), max(Y)), (max(X), max(Y)), (max(X), min(Y))) ) B = borderC.shape[0] border = list(range(T, T + B)) L = nx.Graph( R=R, T=T, B=B, border=border, name=name, VertexC=np.r_[ np.c_[X[R:], Y[R:]], np.c_[X[R - 1 :: -1], Y[R - 1 :: -1]], borderC ], ) L.add_nodes_from(((n, {'label': F[n], 'kind': 'wtg'}) for n in range(T))) L.add_nodes_from(((r, {'label': F[r], 'kind': 'oss'}) for r in range(-R, 0))) return L def G_from_table( table: np.ndarray[:, :], G_base: nx.Graph, capacity: int | None = None, cost_scale: float = 1e3, ) -> nx.Graph: """Create a networkx Graph with nodes and data from G_base and edges from a table. (e.g. the S matrix of juru's `global_optimizer`) Args: table: [ [u, v, length, cable type, load (WT number), cost] ] """ G = nx.Graph() G.graph.update(G_base.graph) G.add_nodes_from(G_base.nodes(data=True)) R = G_base.graph['R'] # indexing differences: # table starts at 1, while G starts at -R edges = table[:, :2].astype(int) - R - 1 G.add_edges_from(edges) nx.set_edge_attributes( G, { (int(u), int(v)): dict(length=length, cable=cable, load=load, cost=cost) for (u, v), length, (cable, load), cost in zip( edges, table[:, 2], table[:, 3:5].astype(int), cost_scale * table[:, 5] ) }, ) G.graph['has_loads'] = True G.graph['has_costs'] = True G.graph['creator'] = 'G_from_table()' if capacity is not None: G.graph['capacity'] = capacity return G def G_from_TG(S, G_base, capacity=None, load_col=4): """DEPRECATED in favor of `G_from_table()` Creates a networkx graph with nodes and data from G_base and edges from a S matrix. S matrix: [ [u, v, length, load (WT number), cable type], ...] """ G = nx.Graph() G.graph.update(G_base.graph) G.add_nodes_from(G_base.nodes(data=True)) R = G_base.graph['R'] T = G_base.graph['T'] # indexing differences: # S starts at 1, while G starts at 0 # S begins with OSSs followed by WTGs, # while G begins with WTGs followed by OSSs # the line bellow converts the indexing: edges = (S[:, :2].astype(int) - R - 1) % (T + R) G.add_weighted_edges_from(zip(*edges.T, S[:, 2]), weight='length') # nx.set_edge_attributes(G, {(u, v): load for (u, v), load # in zip(edges, S[:, load_col])}, # name='load') # try: calcload(G) # except AssertionError as err: # print(f'>>>>>>>> SOMETHING WENT REALLY WRONG: {err} <<<<<<<<<<<') # return G if S.shape[1] >= 4: for (u, v), load in zip(edges, S[:, load_col]): Gload = G.edges[u, v]['load'] assert Gload == load, f'<G.edges[{u}, {v}]> {Gload} != {load} <S matrix>' G.graph['has_loads'] = True G.graph['creator'] = 'G_from_TG()' G.graph['prevented_crossings'] = 0 return G def table_from_G(G): """Create a table representing the edges of G. Args: G: graph to convert to table Returns: table: [ («u», «v», «length», «load (WT number)», «cable type», «edge cost»), ...] (table is a numpy record array) """ R = G.graph['R'] Ne = G.number_of_edges() def edge_parser(edges): for u, v, data in edges: # OSS index starts at 0 # u = (u + R) if u > 0 else abs(u) - 1 # v = (v + R) if v > 0 else abs(v) - 1 # OSS index starts at 1 s = (u + R + 1) if u >= 0 else abs(u) t = (v + R + 1) if v >= 0 else abs(v) # print(u, v, '->', s, t) yield (s, t, data['length'], data['load'], data['cable'], data['cost']) table = np.fromiter( edge_parser(G.edges(data=True)), dtype=[ ('u', int), ('v', int), ('length', float), ('load', int), ('cable', int), ('cost', float), ], count=Ne, ) return table class HeuristicFactory: """Initializes a heuristic algorithm. Args: T: number of nodes R: number of roots rootC: 2D nympy array (R, 2) of the XY coordinates of the roots boundaryC: 2D numpy array (_, 2) of the XY coordinates of the boundary cables: [(«cross section», «capacity», «cost»), ...] ordered by capacity name: site name (increasing capacity along cables' elements) """ def __init__(self, T, R, rootC, boundaryC, heuristic, cables, name='unnamed'): self.T = T self.R = R self.cables = cables self.k = cables[-1][1] self.VertexC = np.empty((T + R, 2), dtype=float) self.VertexC[T:] = rootC # create networkx graph self.G_base = nx.Graph(R=R, VertexC=self.VertexC, boundary=boundaryC, name=name) self.G_base.add_nodes_from( ((n, {'label': F[n], 'kind': 'wtg'}) for n in range(T)) ) self.G_base.add_nodes_from( ((r, {'label': F[r], 'kind': 'oss'}) for r in range(-R, 0)) ) self.heuristic = heuristics[heuristic] def calccost(self, X, Y): assert len(X) == len(Y) == self.T self.VertexC[: self.T, 0] = X self.VertexC[: self.T, 1] = Y self.G = self.heuristic(self.G_base, capacity=self.k) calcload(self.G) assign_cables(self.G, self.cables) return self.G.size(weight='cost') def get_table(self): """Create a table representing the edges of the solution. Must have called cost() at least once. Only the last call's layout is available. Returns: table: [ («u», «v», «length», «load (WT number)», «cable type», «edge cost»), ...] (table is a numpy record array) """ return table_from_G(self.G) def heuristic_wrapper(X, Y, cables, R=1, heuristic='CPEW', return_graph=False): """Run a heuristic on a location defined by X/Y coordinates. This function assumes that the first R coordinates in X/Y are OSSs. (increasing capacity along `cables`' elements) Args: X: x coordinates of nodes Y: y coordinates of nodes R: number of OSSs cables: [(«cross section», «capacity», «cost»), ...] ordered by capacity R: number of OSSs heuristic: {'CPEW', 'OBEW'} Returns: Location graph L. """ G_base = L_from_XYR(X, Y, R) G = heuristics[heuristic](G_base, capacity=cables[-1][1]) calcload(G) assign_cables(G, cables) if return_graph: return table_from_G(G), G else: return table_from_G(G)