# SPDX-License-Identifier: MIT
# https://gitlab.windenergy.dtu.dk/TOPFARM/OptiWindNet/
import logging
import math
import re
from collections import namedtuple
from importlib.resources import files
from itertools import chain
from pathlib import Path
import esy.osm.pbf
import networkx as nx
import numpy as np
import shapely as shp
import utm
import yaml
from scipy.spatial import ConvexHull
from .geometric import rotating_calipers
from .interarraylib import L_from_site
from .utils import make_handle
_lggr = logging.getLogger(__name__)
_info, _warn = _lggr.info, _lggr.warning
__all__ = ('L_from_yaml', 'L_from_pbf', 'L_from_windIO', 'load_repository')
_coord_sep = r',\s*|;\s*|\s{1,}|,|;'
_coord_lbraces = '(['
_coord_rbraces = ')]'
def _get_entries(entries):
if isinstance(entries, str):
for entry in entries.splitlines():
*opt, lat, lon = re.split(_coord_sep, entry)
lat = lat.lstrip(_coord_lbraces)
lon = lon.rstrip(_coord_rbraces)
if opt:
yield opt[0], lat, lon
else:
yield None, lat, lon
else:
for entry in entries:
if len(entry) > 2:
yield entry
else:
yield (None, *entry)
def _translate_latlonstr(entry_list):
translated = []
min = sec = 0.0
for label, lat, lon in _get_entries(entry_list):
latlon = []
for ll in (lat, lon):
deg, *tail = ll.split('°')
if tail:
min, *tail = tail[0].split("'")
if not tail:
hemisphere = min.strip()
min = 0.0
else:
sec, *tail = tail[0].split('"')
if not tail:
hemisphere = sec.strip()
sec = 0.0
else:
hemisphere = tail[0].strip()
latlon.append(
(float(deg) + (float(min) + float(sec) / 60) / 60)
* (1 if hemisphere in ('N', 'E') else -1)
)
else:
# entry is a signed fractional degree without hemisphere letter
latlon.append(float(deg))
translated.append((label, *utm.from_latlon(*latlon)))
return translated
def _parser_latlon(entry_list):
# separate data into columns
labels, eastings, northings, zone_numbers, zone_letters = zip(
*_translate_latlonstr(entry_list)
)
# all coordinates must belong to the same UTM zone
assert all(num == zone_numbers[0] for num in zone_numbers[1:])
assert all(letter == zone_letters[0] for letter in zone_letters[1:])
return np.c_[eastings, northings], (labels if any(labels) else ())
def _parser_planar(entry_list):
labels = []
coords = []
for label, easting, northing in _get_entries(entry_list):
labels.append(label)
coords.append((float(easting), float(northing)))
return np.array(coords, dtype=float), (labels if any(labels) else ())
coordinate_parser = dict(
latlon=_parser_latlon,
planar=_parser_planar,
)
[docs]
def L_from_yaml(filepath: Path | str, handle: str | None = None) -> nx.Graph:
"""Import wind farm data from .yaml file.
Two options available for COORDINATE_FORMAT: "planar" and "latlon".
Format "planar" is: [label] easting northing. Example::
LABEL 234.2 5212.5
Format "latlon" is: [label] latitude longitude. Example::
LABEL1 11°22.333'N 44°55.666'E
LABEL2 11.3563°N 44.8903°E
LABEL3 11°22'20"N 44°55'40"E
The [label] is optional. Ensure no spaces within a latitude or longitude.
The coordinate pair may be separated by "," or ";" and may be enclosed in
"[]" or "()". Example::
LABEL [234.2, 5212.5]
Args:
filepath: path to `.yaml` file to read.
handle: Short moniker for the site.
Returns:
Unconnected location graph L.
"""
if isinstance(filepath, str):
filepath = Path(filepath)
# read wind power plant site YAML file
parsed_dict = yaml.safe_load(open(filepath, 'r', encoding='utf8'))
name = filepath.stem
handle = parsed_dict.get('HANDLE')
if handle is None:
handle = make_handle(name)
# default format is "latlon"
format = parsed_dict.get('COORDINATE_FORMAT', 'latlon')
Border, BorderLabel = coordinate_parser[format](parsed_dict['EXTENTS'])
Root, RootLabel = coordinate_parser[format](parsed_dict['SUBSTATIONS'])
Terminal, TerminalLabel = coordinate_parser[format](parsed_dict['TURBINES'])
T = Terminal.shape[0]
R = Root.shape[0]
vertex_xy = {xy: i for i, xy in enumerate(map(tuple, Terminal.tolist()))}
vertex_xy.update(
{xy: i for i, xy in enumerate(map(tuple, Root.tolist()), start=-R)}
)
i = T
border_xy_ = []
border = []
for xy in map(tuple, Border.tolist()):
j = vertex_xy.get(xy)
if j is None:
border_xy_.append(xy)
border.append(i)
vertex_xy[xy] = i
i += 1
else:
if j >= T:
_warn(
'Repeated EXTENTS vertex detected: %s. This is not supported for a '
"location's border. Skipping this vertex, please fix the file: %s.",
xy,
filepath,
)
continue
border.append(vertex_xy[xy])
B = len(border_xy_)
optional = {}
obstacles = parsed_dict.get('OBSTACLES')
obstacleC_ = []
if obstacles is not None:
# obstacle has to be a list of arrays, so parsing is a bit different
indices = []
for obstacle_entry in parsed_dict['OBSTACLES']:
obstacleC, poly_tag = coordinate_parser[format](obstacle_entry)
obstacle_xy_ = []
obstacle = []
for xy in map(tuple, obstacleC.tolist()):
if xy not in vertex_xy:
obstacle_xy_.append(xy)
obstacle.append(i)
vertex_xy[xy] = i
i += 1
else:
obstacle_xy_.append(vertex_xy[xy])
B += len(obstacle_xy_)
indices.append(np.array(obstacle, dtype=np.int_))
obstacleC_.extend(obstacle_xy_)
optional['obstacles'] = indices
VertexC = np.vstack((Terminal, *border_xy_, *obstacleC_, Root))
lsangle = parsed_dict.get('LANDSCAPE_ANGLE')
if lsangle is not None:
optional['landscape_angle'] = lsangle
# create networkx graph
G = nx.Graph(
T=T,
R=R,
B=B,
VertexC=VertexC,
border=np.array(border, dtype=np.int_),
name=name,
handle=handle,
**optional,
)
# populate graph G
G.add_nodes_from(range(T), kind='wtg')
if TerminalLabel:
nx.set_node_attributes(G, {t: TerminalLabel[t] for t in range(T)}, name='label')
G.add_nodes_from(range(-R, 0), kind='oss')
if RootLabel:
nx.set_node_attributes(
G, {-R + r: RootLabel[r] for r in range(R)}, name='label'
)
return G
[docs]
def L_from_pbf(filepath: Path | str, handle: str | None = None) -> nx.Graph:
"""Import wind farm data from .osm.pbf file.
Args:
filepath: path to `.osm.pbf` file to read.
handle: Short moniker for the site.
Returns:
Unconnected location graph L.
"""
if isinstance(filepath, str):
filepath = Path(filepath)
assert ['.osm', '.pbf'] == filepath.suffixes[-2:], (
'Argument `filepath` does not have `.osm.pbf` extension.'
)
name = filepath.stem[:-4]
osm = esy.osm.pbf.File(filepath)
plant_name = None
nodes = {}
substations = []
substation_labels = []
turbines = []
turbine_labels = []
border_raw = None
obstacles_raw = []
ways = {}
for e in osm:
match e:
case esy.osm.pbf.Node():
nodes[e.id] = e
power_kind = e.tags.get('power')
if power_kind is None:
power_kind = e.tags.get('construction:power')
label = e.tags.get('ref') or e.tags.get('name')
match power_kind:
case 'substation' | 'transformer':
substations.append(e.lonlat[::-1])
substation_labels.append(label)
case 'generator':
turbines.append(e.lonlat[::-1])
turbine_labels.append(label)
case _:
_info('Unhandled power category for Node: %s', power_kind)
case esy.osm.pbf.Way():
power_kind = e.tags.get('power')
if power_kind is None:
power_kind = e.tags.get('construction:power')
match power_kind:
case 'plant':
plant_name = e.tags.get('name:en') or e.tags.get('name')
handle = e.tags.get('handle') or make_handle(name)
if border_raw is not None:
raise ValueError('Only a single border is supported.')
border_raw = [nodes[nid].lonlat[::-1] for nid in e.refs[:-1]]
case 'substation' | 'transformer':
label = e.tags.get('ref') or e.tags.get('name')
substations.append(
[nodes[nid].lonlat[::-1] for nid in e.refs[:-1]]
)
substation_labels.append(label)
case 'generator':
_info('Generator must be Node, not Way.')
case None:
# likely to be used in a Relation
ways[e.id] = e
case _:
_info('Unhandled power category for Way: %s', power_kind)
case esy.osm.pbf.Relation():
if e.tags.get('type') == 'multipolygon':
power_kind = e.tags.get('power')
if power_kind is None:
power_kind = e.tags.get('construction:power')
match power_kind:
case 'plant':
plant_name = e.tags.get('name:en') or e.tags.get('name')
handle = e.tags.get('handle') or make_handle(name)
for m in e.members:
eid, cls, kind = m
match cls:
case 'WAY':
match kind:
case 'outer':
if border_raw is not None:
raise ValueError(
'Only a single border'
' is supported.'
)
border_raw = [
nodes[nid].lonlat[::-1]
for nid in ways[eid].refs[:-1]
]
case 'inner':
obstacles_raw.append(
[
nodes[nid].lonlat[::-1]
for nid in ways[eid].refs[:-1]
]
)
case _:
_info(
'Unhandled power category for Relation: %s', power_kind
)
T = len(turbines)
R = len(substations)
if T == 0 or R == 0:
raise ValueError(
f'Location: "{name}" -> Unable to identify at least one'
' substation and one generator.'
)
# for i, substation in enumerate(tuple(substations)):
for i, substation in enumerate(tuple(substations)):
if isinstance(substation, list):
# Substation defined as a polygon, reduce it to a point
easting, northing, zone_num, zone_let = utm.from_latlon(
*np.array(tuple(zip(*substation)))
)
centroid = shp.Polygon(shell=list(zip(easting, northing))).centroid
latlon = utm.to_latlon(centroid.x, centroid.y, zone_num, zone_let)
substations[i] = latlon
node_latlon = {node: i for i, node in enumerate(turbines)}
node_latlon.update({node: i for i, node in enumerate(substations, start=-R)})
i = T
border_latlon = []
border_list = []
if border_raw is None:
B = 0
else:
for latlon in border_raw:
if latlon not in node_latlon:
border_latlon.append(latlon)
border_list.append(i)
node_latlon[latlon] = i
i += 1
else:
border_list.append(node_latlon[latlon])
B = len(border_latlon)
obstacles = []
obstacles_latlon = []
for obstacle_entry in obstacles_raw:
obstacle_latlon = []
obstacle = []
for latlon in obstacle_entry:
if latlon not in node_latlon:
obstacle_latlon.append(latlon)
obstacle.append(i)
node_latlon[latlon] = i
i += 1
else:
obstacle.append(node_latlon[latlon])
B += len(obstacle_latlon)
obstacles.append(np.array(obstacle, dtype=np.int_))
obstacles_latlon.extend(obstacle_latlon)
# Build site data structure
latlon = np.array(
tuple(
chain(
turbines,
border_latlon,
obstacles_latlon,
substations,
)
),
dtype=float,
)
# TODO: find the UTM sector that includes the most coordinates among
# vertices and boundary (bin them in 6° sectors and count). Then insert
# a dummy coordinate as the first array row, such that utm.from_latlon()
# will pick the right zone for all points.
VertexC = np.c_[utm.from_latlon(*latlon.T)[:2]]
if handle is None:
handle = make_handle(name)
L = L_from_site(
T=T,
R=R,
VertexC=VertexC,
name=name,
handle=handle,
)
for labels, start in ((substation_labels, -R), (turbine_labels, 0)):
if any(labels):
for i, label in enumerate(labels, start=start):
if label is not None:
L.nodes[i]['label'] = label
if border_list:
border = np.array(border_list, dtype=np.int_)
L.graph['border'] = border
# for now, obstacles are allowed only if a border is defined
if obstacles:
L.graph['obstacles'] = [
np.array(obstacle, dtype=np.int_) for obstacle in obstacles
]
border_list.extend([r for r in range(-R, 0) if r not in set(border_list)])
hullC_ = VertexC[
np.array(border_list)[ConvexHull(VertexC[border_list]).vertices]
]
else:
# if no border is defined, pass all vertices to ConvexHull
hullC_ = VertexC[ConvexHull(VertexC).vertices]
_, best_caliper_angle, _, _ = rotating_calipers(hullC_, metric='height')
best_caliper_angle_deg = 180 * best_caliper_angle / math.pi
ls_angle = 90 - best_caliper_angle_deg
ls_angle = (
ls_angle if -90 <= ls_angle < 90 else ls_angle + (180 if ls_angle < 0 else -180)
)
L.graph['landscape_angle'] = ls_angle
L.graph['B'] = B
if plant_name is not None:
L.graph['OSM_name'] = plant_name
return L
def _yaml_include_constructor(loader, node):
filename = node.value
with open(filename, 'r') as f:
return yaml.load(f, Loader=type(loader))
class IncludeLoader(yaml.SafeLoader):
def __init__(self, stream):
# Store the directory of the currently loaded YAML file
self._parent = Path(stream.name).parent
super().__init__(stream)
self.add_constructor('!include', IncludeLoader.include)
def include(self, node):
# Construct the full path of the file to include, relative to parent YAML
include_path = Path(self.construct_scalar(node))
if include_path.suffix not in ('.yml', '.yaml'):
_warn(
'Ignoring YAML "!include" directive to unsupported file type (%s)',
include_path,
)
return {}
if not include_path.is_absolute():
include_path = self._parent / include_path
with open(include_path, 'r') as f:
# When processing includes, use IncludeLoader to maintain
# correct directory context
return yaml.load(f, Loader=IncludeLoader)
[docs]
def L_from_windIO(filepath: Path | str, handle: str | None = None) -> nx.Graph:
"""Import wind farm data from a windIO .yaml file.
Args:
filepath: path to windIO `.yaml` file to read.
handle: Short moniker for the site.
Returns:
Unconnected location geometry L.
"""
if isinstance(filepath, str):
filepath = Path(filepath)
name = filepath.stem
system = yaml.load(filepath.open(), Loader=IncludeLoader)
coords = system['wind_farm']['layouts']['initial_layout']['coordinates']
terminalC = np.c_[coords['x'], coords['y']]
coords = system['wind_farm']['electrical_substations']['coordinates']
rootC = np.c_[coords['x'], coords['y']]
coords = system['site']['boundaries']['polygons'][0]
borderC = np.c_[coords['x'], coords['y']]
T = terminalC.shape[0]
R = rootC.shape[0]
B = borderC.shape[0]
if handle is None:
handle = make_handle(name)
L = L_from_site(
R=R,
T=T,
B=B,
VertexC=np.vstack((terminalC, borderC, rootC)),
**({'border': np.arange(T, T + B)} if (borderC is not None and B >= 3) else {}),
name=name,
handle=handle,
)
return L
[docs]
def load_repository(path: Path | str | None = None) -> tuple[nx.Graph, ...]:
"""Load locations from files of known formats into a namedtuple.
Each file (.yaml or .osm.pbf) is translated into a location graph and
included as an attribute in the returned namedtuple. The attribute name
can be specified in the .yaml with the field `HANDLE` or in the .osm.pbf
file with the tag `handle` applied to the power plant object.
Args:
path: Path to look for location files (non-recursive). If omited, the
locations included in optiwindnet are loaded.
Returns:
Named tuple which has the location handles as attribute identifiers.
"""
if path is None:
path = files(__package__) / 'data'
else:
path = Path(path)
locations = [L_from_yaml(file) for file in path.glob('*.yaml')]
locations.extend(L_from_pbf(file) for file in path.glob('*.osm.pbf'))
handles = tuple(L.graph['handle'] for L in locations)
return namedtuple('Locations', handles)(*locations)