# SPDX-License-Identifier: MIT
# https://gitlab.windenergy.dtu.dk/TOPFARM/OptiWindNet/
import base64
import io
import json
from collections.abc import Sequence
from functools import partial
from hashlib import sha256
from itertools import chain, pairwise
from socket import getfqdn, gethostname
from typing import Any, Mapping
import networkx as nx
import numpy as np
from ..interarraylib import calcload
from ..utils import make_handle
from .model import (
Machine,
Method,
NodeSet,
RouteSet,
)
__all__ = ()
PackType = Mapping[str, Any]
# Set of not-to-store keys commonly found in G routesets (they are either
# already stored in database fields or are cheap to regenerate or too big.
_misc_not = {
'VertexC',
'anglesYhp',
'anglesXhp',
'anglesRank',
'angles',
'd2rootsRank',
'd2roots',
'name',
'boundary',
'capacity',
'B',
'runtime',
'runtime_unit',
'edges_fun',
'D',
'DetourC',
'fnT',
'landscape_angle',
'Root',
'creation_options',
'G_nodeset',
'T',
'non_A_gates',
'funfile',
'funhash',
'funname',
'diagonals',
'planar',
'has_loads',
'R',
'Subtree',
'handle',
'non_A_edges',
'max_load',
'fun_fingerprint',
'hull',
'solver_log',
'length_mismatch_on_db_read',
'gnT',
'C',
'border',
'obstacles',
'num_diagonals',
'crossings_map',
'tentative',
'method_options',
'is_normalized',
'norm_scale',
'norm_offset',
'detextra',
'rogue',
'clone2prime',
'valid',
'path_in_P',
'shortened_contours',
'nonAedges',
'method',
'num_stunts',
'crossings',
'creator',
'inter_terminal_clearance_min',
'inter_terminal_clearance_safe',
'stunts_primes',
}
[docs]
def L_from_nodeset(nodeset: NodeSet, handle: str | None = None) -> nx.Graph:
"""Translate a NodeSet database entry to a location graph.
Args:
nodeset: an entry from the database NodeSet table.
Returns:
Graph L containing the positions and location metadata.
"""
T = nodeset.T
R = nodeset.R
B = nodeset.B
border = np.array(nodeset.constraint_vertices[: nodeset.constraint_groups[0]])
name = nodeset.name
if handle is None:
handle = make_handle(name if name[0] != '!' else name[1 : name.index('!', 1)])
L = nx.Graph(
R=R,
T=T,
B=B,
name=name,
handle=handle,
VertexC=np.lib.format.read_array(io.BytesIO(nodeset.VertexC)),
landscape_angle=nodeset.landscape_angle,
)
if len(border) > 0:
L.graph['border'] = border
if len(nodeset.constraint_groups) > 1:
obstacle_idx = np.cumsum(np.array(nodeset.constraint_groups))
L.graph.update(
obstacles=[
np.array(nodeset.constraint_vertices[a:b])
for a, b in pairwise(obstacle_idx)
]
)
L.add_nodes_from(((n, {'kind': 'wtg'}) for n in range(T)))
L.add_nodes_from(((r, {'kind': 'oss'}) for r in range(-R, 0)))
return L
[docs]
def G_from_routeset(routeset: RouteSet) -> nx.Graph:
"""Translate a RouteSet database entry to a routeset graph.
Args:
routeset: an entry from the database RouteSet table.
Returns:
Graph G containing the routeset.
"""
nodeset = routeset.nodes
G = L_from_nodeset(nodeset)
misc = routeset.misc if routeset.misc is not None else {}
G.graph.update(
C=routeset.C,
D=routeset.D,
handle=routeset.handle,
capacity=routeset.capacity,
creator=routeset.creator,
method=dict(
solver_name=routeset.method.solver_name,
timestamp=routeset.method.timestamp,
funname=routeset.method.funname,
funfile=routeset.method.funfile,
funhash=routeset.method.funhash,
),
runtime=routeset.runtime,
method_options=routeset.method.options,
**misc,
)
if routeset.detextra is not None:
G.graph['detextra'] = routeset.detextra
untersify_to_G(G, terse=routeset.edges, clone2prime=routeset.clone2prime)
calc_length = G.size(weight='length')
if abs(calc_length / routeset.length - 1) > 1e-5:
G.graph['length_mismatch_on_db_read'] = calc_length - routeset.length
if routeset.rogue:
for u, v in zip(routeset.rogue[::2], routeset.rogue[1::2]):
G[u][v]['kind'] = 'rogue'
if routeset.tentative:
for r, n in zip(routeset.tentative[::2], routeset.tentative[1::2]):
G[r][n]['kind'] = 'tentative'
return G
def packnodes(G: nx.Graph) -> PackType:
R, T, B = (G.graph[k] for k in 'RTB')
VertexC = G.graph['VertexC']
VertexC_npy_io = io.BytesIO()
np.lib.format.write_array(VertexC_npy_io, VertexC, version=(3, 0))
VertexC_npy = VertexC_npy_io.getvalue()
digest = sha256(VertexC_npy).digest()
if G.name[0] == '!':
name = G.name + base64.b64encode(digest).decode('ascii')
else:
name = G.name
constraint_vertices = list(
chain((G.graph.get('border', ()),), G.graph.get('obstacles', ()))
)
pack = dict(
T=T,
R=R,
B=B,
name=name,
VertexC=VertexC_npy,
constraint_groups=[p.shape[0] for p in constraint_vertices],
constraint_vertices=np.concatenate(
constraint_vertices, dtype=int, casting='unsafe'
).tolist(),
landscape_angle=G.graph.get('landscape_angle', 0.0),
digest=digest,
)
return pack
def packmethod(method_options: dict) -> PackType:
options = {
k: method_options[k]
for k in sorted(method_options)
if k not in ('fun_fingerprint', 'solver_name')
}
ffprint = method_options['fun_fingerprint']
digest = sha256(ffprint['funhash'] + json.dumps(options).encode()).digest()
pack = dict(
digest=digest,
solver_name=method_options['solver_name'],
options=options,
**ffprint,
)
return pack
def add_if_absent(entity: type, pack: PackType) -> bytes:
digest = pack['digest']
if not entity.select().where(entity.digest == digest).exists():
entity.create(**pack)
return digest
def method_from_G(G: nx.Graph) -> bytes:
"""
Returns:
Primary key of the entry.
"""
pack = packmethod(G.graph['method_options'])
return add_if_absent(Method, pack)
def nodeset_from_G(G: nx.Graph) -> bytes:
"""Returns primary key of the entry."""
pack = packnodes(G)
return add_if_absent(NodeSet, pack)
def terse_pack_from_G(G: nx.Graph) -> PackType:
"""Convert `G`'s edges to a format suitable for storing in the database.
Although graph `G` in undirected, the edge attribute `'reverse'` and its
nodes' numbers encode the direction of power flow. The terse
representation uses that and the fact that `G` is a tree.
Returns:
dict with keys:
edges: where <i, edges[i]> is a directed edge of `G`
clone2prime: mapping the above-T clones to below-T nodes
"""
R, T, B = (G.graph[k] for k in 'RTB')
C, D = (G.graph.get(k, 0) for k in 'CD')
terse = np.empty((T + C + D,), dtype=int)
if not G.graph.get('has_loads'):
calcload(G)
for u, v, reverse in G.edges(data='reverse'):
if reverse is None:
raise ValueError('reverse must not be None')
u, v = (u, v) if u < v else (v, u)
i, target = (u, v) if reverse else (v, u)
if i < T:
terse[i] = target
else:
terse[i - B] = target
terse_pack = dict(edges=terse.tolist())
if C > 0 or D > 0:
terse_pack['clone2prime'] = G.graph['fnT'][T + B : -R].tolist()
return terse_pack
def untersify_to_G(G: nx.Graph, terse: list, clone2prime: list) -> None:
"""
Changes G in place!
"""
R, T, B = (G.graph[k] for k in 'RTB')
C, D = (G.graph.get(k, 0) for k in 'CD')
VertexC = G.graph['VertexC']
terse = np.asarray(terse)
source = np.arange(len(terse))
if clone2prime:
source[T:] += B
contournodes = range(T + B, T + B + C)
detournodes = range(T + B + C, T + B + C + D)
G.add_nodes_from(contournodes, kind='contour')
G.add_nodes_from(detournodes, kind='detour')
fnT = np.arange(R + T + B + C + D)
fnT[T + B : T + B + C + D] = clone2prime
fnT[-R:] = range(-R, 0)
G.graph['fnT'] = fnT
Length = np.hypot(*(VertexC[fnT[terse]] - VertexC[fnT[source]]).T)
else:
Length = np.hypot(*(VertexC[terse] - VertexC[source]).T)
G.add_weighted_edges_from(
zip(source.tolist(), terse.tolist(), Length.tolist()), weight='length'
)
if clone2prime:
for _, _, edgeD in G.edges(contournodes, data=True):
edgeD['kind'] = 'contour'
for _, _, edgeD in G.edges(detournodes, data=True):
edgeD['kind'] = 'detour'
calcload(G)
def oddtypes_to_serializable(obj):
if isinstance(obj, (list, tuple)):
return type(obj)(oddtypes_to_serializable(item) for item in obj)
elif isinstance(obj, dict):
return {k: oddtypes_to_serializable(v) for k, v in obj.items()}
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, (np.integer,)):
return int(obj)
elif isinstance(obj, (np.floating,)):
return float(obj)
else:
return obj
def pack_G(G: nx.Graph) -> dict[str, Any]:
R, T, B = (G.graph[k] for k in 'RTB')
C, D = (G.graph.get(k, 0) for k in 'CD')
terse_pack = terse_pack_from_G(G)
misc = {key: G.graph[key] for key in G.graph.keys() - _misc_not}
for k, v in misc.items():
misc[k] = oddtypes_to_serializable(v)
if not misc:
misc = {}
length = G.size(weight='length')
handle = G.graph.get('handle')
if handle is None:
handle = make_handle(G.graph['name'])
packed_G = dict(
R=R,
T=T,
C=C,
D=D,
handle=handle,
capacity=G.graph['capacity'],
length=length,
creator=G.graph['creator'],
runtime=G.graph['runtime'],
feeders_per_root=[len(G[root]) for root in range(-R, 0)],
misc=misc,
**terse_pack,
)
# Optional fields
if C + D > 0:
packed_G['clone2prime'] = G.graph['fnT'][-C - D - R : -R].tolist()
concatenate_tuples = partial(sum, start=())
pack_if_given = ( # key, function to prepare data
('detextra', None),
('num_diagonals', None),
('tentative', concatenate_tuples),
('rogue', concatenate_tuples),
)
packed_G.update(
{
k: (fun(G.graph[k]) if fun else G.graph[k])
for k, fun in pack_if_given
if k in G.graph
}
)
return packed_G
[docs]
def store_G(G: nx.Graph) -> int:
"""Store `G`'s data to a new `RouteSet` record in the database.
If the NodeSet or Method are not yet in the database, they will be added.
Args:
G: Graph with the routeset.
Returns:
Primary key of the newly created RouteSet record.
"""
packed_G = pack_G(G)
nodesetID = nodeset_from_G(G)
methodID = method_from_G(G)
machineID = get_machine_pk()
packed_G.update(
nodes=nodesetID,
method=methodID,
machine=machineID,
)
rs = RouteSet.create(**packed_G)
return rs.id
def get_machine_pk() -> int:
fqdn = getfqdn()
hostname = gethostname()
if fqdn == 'localhost':
machine = hostname
else:
if hostname.startswith('n-'):
machine = fqdn[len(hostname) :]
else:
machine = fqdn
m, _ = Machine.get_or_create(name=machine)
return m.id
[docs]
def G_by_method(G: nx.Graph, method: Method) -> nx.Graph:
"""Fetch from the database a layout for `G` by `method`.
`G` must be a layout solution with the necessary info in the G.graph dict.
`method` is a Method.
"""
farmname = G.name
c = G.graph['capacity']
rs = (
RouteSet.select()
.join(NodeSet)
.where(
NodeSet.name == farmname,
RouteSet.method == method.digest,
RouteSet.capacity == c,
)
.get()
)
Gdb = G_from_routeset(rs)
calcload(Gdb)
return Gdb
[docs]
def Gs_from_attrs(
farm: object,
methods: Method | Sequence[object],
capacities: int | Sequence[int],
) -> list[tuple[nx.Graph]]:
"""
Fetch from the database a list (one per capacity) of tuples (one per
method) of layouts.
`farm` must have the desired NodeSet name in the `name` attribute.
`methods` is a (sequence of) Method instance(s).
`capacities` is a (sequence of) int(s).
"""
Gs = []
if not isinstance(methods, Sequence):
methods = (methods,)
if not isinstance(capacities, Sequence):
capacities = (capacities,)
for c in capacities:
Gtuple = tuple(
G_from_routeset(
RouteSet.select()
.join(NodeSet)
.where(
NodeSet.name == farm.name,
RouteSet.method == m.digest,
RouteSet.capacity == c,
)
.get()
)
for m in methods
)
for G in Gtuple:
calcload(G)
Gs.append(Gtuple)
return Gs