from __future__ import annotations
import abc
from copy import deepcopy
import xml.etree.ElementTree as et
import numpy as np
import scipy.sparse as sps
from .shared import Shared, ID_UNMAPPED, LBL_UNMAPPED
from paminco.utils.readin import xml_find_root, xml_add_element
from paminco.utils.typing import is_int
from paminco.utils.misc import Cache
from paminco import _doc
TAG_COMMODITIES = "commodities"
DTYPE_INT = np.int32
DTYPE_FLOAT = np.float64
[docs]class Commodity(abc.ABC):
"""Abstract class of a commodity."""
def __repr__(self) -> str:
return self.__class__.__name__ + " @ " + str(hex(id(self))) + "\n" + str(self)
[docs] @staticmethod
def from_data(data, **kwargs):
"""Construct commodity from ``data``.
Parameters
----------
data : tuple or dict
``(source, target, rate)``: a CommoditySingleSourceSink is constructed.
``dict``: a CommodityMultiSourceSink is constructed.
Returns
-------
CommoditySingleSourceSink or CommodityMultiSourceSink
See Also
--------
CommoditySingleSourceSink
CommodityMultiSourceSink
"""
if isinstance(data, Commodity):
return data
if isinstance(data, tuple) and len(data) == 3:
if data[0] == data[1] or data[2] == 0:
return None
return CommoditySingleSourceSink.from_tuple(data, **kwargs)
elif isinstance(data, dict):
return CommodityMultiSourceSink(data, **kwargs)
raise TypeError("Data for commodity must be tuple of len 3 or dict.")
@abc.abstractmethod
def get_scaled_copy(self): ...
@abc.abstractmethod
def map_node_labels(self): ...
@abc.abstractmethod
def map_node_ids(self): ...
@property
@abc.abstractmethod
def source_lbl(self): ...
@property
@abc.abstractmethod
def sink_lbl(self): ...
@property
@abc.abstractmethod
def source_id(self): ...
@property
@abc.abstractmethod
def sink_id(self): ...
@property
@abc.abstractmethod
def rate(self): ...
[docs]class CommoditySingleSourceSink(Commodity):
"""Commodity with exactly one source and one sink.
Parameters
----------
source : str or int
Label or index of source.
sink : str or int
Label or index of sink.
rate : float
Rate of flow from source to sink.
is_label : bool, default=True
If True, source and sink are expected to be str, else as int.
Attributes
----------
has_valid_indices
has_valid_labels
source_lbl
source_id
sink_lbl
sink_id
rate
total_rate
"""
def __init__(
self,
source,
sink,
rate: float,
is_label: bool = True,
) -> None:
if is_label is True:
if (isinstance(source, str) is False or
isinstance(sink, str) is False):
raise TypeError(
"source and sink must be str if is_label==True."
)
self._source_lbl = source
self._sink_lbl = sink
self._source_id = ID_UNMAPPED
self._sink_id = ID_UNMAPPED
else:
if is_int(source) is False or is_int(sink) is False:
raise TypeError(
"source and sink must be int if is_label==False."
)
self._source_lbl = LBL_UNMAPPED
self._sink_lbl = LBL_UNMAPPED
self._source_id = source
self._sink_id = sink
self._rate = rate
def __len__(self) -> int:
return 2
def __eq__(self, other) -> bool:
if isinstance(other, CommoditySingleSourceSink) is False:
raise TypeError(
"Comparison object must be of type 'CommoditySingleSourceSink'."
)
for att in ["_source_lbl", "_sink_lbl", "_source_lbl", "_sink_lbl", "_rate"]:
if getattr(self, att) != getattr(other, att):
return False
return True
def __str__(self) -> str:
if self.has_valid_indices and self.has_valid_labels:
return (
f"'{self.source_lbl}' "
f"({self.source_id}) \u2192 '{self.sink_lbl}' "
f"({self.sink_id}) | {self.rate}."
)
elif self.has_valid_labels:
return (
f"'{self.source_lbl}' "
f"\u2192 '{self.sink_lbl}' | {self.rate}."
)
elif self.has_valid_indices:
return (
f"{self.source_id} "
f"\u2192 {self.sink_id} | {self.rate}."
)
else:
raise ValueError("Neither ids not labels have been mapped.")
[docs] def get_scaled_copy(self, scale_factor: float = 1) -> CommoditySingleSourceSink:
"""Get a copy of commodity with rate scaled by ``scale_factor``.
Parameters
----------
scale_factor : float, default=1
Scale rate from source to sink by scale_factor.
Returns
-------
CommoditySingleSourceSink
Commodity with rate scaled by ``scale_factor``.
"""
cpy = self.__new__(CommoditySingleSourceSink)
for att in ["_source_lbl", "_source_id", "_sink_lbl", "_sink_id",
"_rate"]:
if hasattr(self, att):
setattr(cpy, att, getattr(self, att))
cpy._rate *= scale_factor
return cpy
[docs] def get_sparse_elements(self, col_number: int = 0):
"""Extract commodity data for sparse demand matrix.
Parameters
----------
col_number : int, default=0
Column indices of the matrix entries.
Returns
-------
list
``[source_id, sink_id]``: Row indices of the matrix entries .
list
``[col_number, col_number]``: Column indices.
list
``[-rate, rate]``: Data.
"""
rows = [self.source_id, self.sink_id]
cols = [col_number, col_number]
data = [-self.rate, self.rate]
return (rows, cols, data)
[docs] def map_node_labels(self, d: dict) -> None:
"""Map labels -> ids by d."""
self._source_id = d[self.source_lbl]
self._sink_id = d[self.sink_lbl]
[docs] def map_node_ids(self, d: dict) -> None:
"""Map ids -> labels by d."""
self._source_lbl = d[self.source_id]
self._sink_lbl = d[self.sink_id]
[docs] def node_in_comm(self, node, is_label: bool = True) -> bool:
"""Check whether commodity is made up by node in ``node``.
Parameters
----------
node : int, str or array_like
Nodes to check for.
is_label : bool, default=True
Whether nodes in ``node`` are labels.
Returns
-------
bool
Whether commodity is made up with ``node``.
"""
# Determine search array
if is_label is True:
search_arr = [self.source_lbl, self.sink_lbl]
else:
search_arr = [self.source_id, self.sink_id]
node = np.array(node, copy=False)
if len(np.intersect1d(node, search_arr)) > 0:
return True
return False
[docs] def to_tuple(self, as_label: bool = True) -> tuple:
"""Get Commodity data as tuple.
Parameters
----------
as_label : bool, default=True
If True, return (source_label, sink_label, rate), else
(source_id, sink_id, rate).
Returns
-------
tuple
War commodity data.
"""
if as_label is True:
return (self.source_lbl, self.sink_lbl, self.rate)
return (self.source_id, self.sink_id, self.rate)
[docs] @classmethod
def from_tuple(cls, t, **kwargs) -> CommoditySingleSourceSink:
"""Construct from tuple (source, sink, rate).
Parameters
----------
t : tuple
``(source, sink, rate)``: Raw commodity data.
kwargs : keyword arguments, optional
Further keyword arguments passed to CommoditySingleSourceSink
constructor.
Returns
-------
CommoditySingleSourceSink
Commodity object created from ``t``.
"""
assert isinstance(t, tuple) is True and len(t) == 3
return cls(*t, **kwargs)
[docs] @classmethod
def from_labels_and_id(cls, data) -> CommoditySingleSourceSink:
"""Construct from fully specified data (labels and ids).
Parameters
----------
data : tuple
``(node_labels, ndoes_ids, rate)`` where ``node_labels``
is a tuple of str (source_label, sink_label), ``node_ids``
a tuple of int (source_id, sink_id) and ``rate`` a float.
Returns
-------
CommoditySingleSourceSink
Commodity object created from ``data``.
"""
c = cls.__new__(cls)
c._source_lbl, c._sink_lbl = data[0]
c._source_id, c._sink_id = data[1]
c._rate = data[2]
return c
@property
def has_valid_indices(self) -> bool:
"""Whether node labels have been mapped to node indices."""
if self.source_id == ID_UNMAPPED or self.sink_id == ID_UNMAPPED:
return False
return True
@property
def has_valid_labels(self) -> bool:
"""Whether node indices have been mapped to node labels."""
if self.source_lbl == LBL_UNMAPPED or self.sink_lbl == LBL_UNMAPPED:
return False
return True
@property
def source_lbl(self) -> str:
"""str: label of source node."""
return self._source_lbl
@property
def source_id(self) -> int:
"""int: index of source node."""
return self._source_id
@property
def sink_lbl(self) -> str:
"""str: label of sink node."""
return self._sink_lbl
@property
def sink_id(self) -> int:
"""int: index of sink node."""
return self._sink_id
@property
def rate(self) -> float:
"""float: rate of flow from source to sink."""
return self._rate
@property
def total_rate(self) -> float:
"""float: rate of flow from source to sink."""
return self._rate
[docs]class CommodityMultiSourceSink(Commodity):
"""Commodity with multiple sources and/or multiple sinks.
A CommodityMultiSourceSink can be comprised of any number of sources (N_in) and
any number of sinks (N_out). The total number of nodes that make up a
CommodityMultiSourceSink is thus N_tot = N_in + N_out.
Parameters
----------
data : dict
Keys are node labels or ids, values are rate (in/outflow).
is_label : bool, default=True,
Whether keys in ``data`` are labels.
check_validity : bool, default=False
Check whether sum of values in ``data`` equals 0.
dtype_float : type, default=numpy.float_
Datatype to save rate values.
"""
def __init__(
self,
data: dict,
is_label=True,
check_validity=False,
dtype_float=np.float_
) -> None:
# get nodes and rates from dict
node, self._rate = np.array(list(data.items())).T
self._rate = self._rate.astype(dtype_float)
if (check_validity is True and
np.isclose(self._rate.sum(), 0) is False):
raise ValueError(
f"Total in/outflow: {self._rate.sum()} ""does not equal 0."
)
if is_label is True:
# retrieve nodes as labels
self._node_labels = node.astype(str)
self._node_ids = np.ones_like(self._node_labels).astype(int) * ID_UNMAPPED
else:
# retrieve nodes as indices
self._node_ids = node.astype(int)
self._node_labels = np.array([LBL_UNMAPPED] * len(self._node_ids))
def __eq__(self, other) -> bool:
if isinstance(other, CommodityMultiSourceSink) is False:
raise TypeError(
"Comparison object must be of type 'CommodityMultiSourceSink'."
)
for att in ['_node_labels', '_node_ids', 'rate']:
if np.array_equal(getattr(self, att), getattr(other, att)) is False:
return False
return True
def __len__(self) -> int:
return len(self._rate)
def __str__(self) -> str:
out = ""
labels, indices, rates = self._node_labels, self._node_ids, self.rate
def nl(i):
if i != 0:
return "\n"
return ""
if self.has_valid_indices and self.has_valid_labels:
for i in range(len(self)):
out += f"{nl(i)}'{labels[i]}' ({indices[i]}) : {rates[i]}"
elif self.has_valid_labels:
for i in range(len(self)):
out += f"{nl(i)}'{labels[i]}' : {rates[i]}"
elif self.has_valid_indices:
for i in range(len(self)):
out += f"{nl(i)}{indices[i]} : {rates[i]}"
else:
raise ValueError("Neither ids not labels have been mapped.")
return out
[docs] def decompose(self, max_iter=None, as_label: bool = True):
"""Decompose CommodityMultiSourceSink into several single commodities.
Parameters
----------
max_iter : int, optional
Number of iterations for decomposition
as_label : bool, default=True
If True return tuples with node labels, else with node ids.
Returns
-------
list
Single commodities as tuples (source, sink, rate).
"""
if max_iter is None:
max_iter = len(self)**2
# Init search and results
search_array = np.copy(self.rate)
st_pairs = []
total_matched = 0
for i in range(max_iter):
# get most negative source
idx_s = np.argmin(search_array)
s = search_array[idx_s]
# get unmatched sink
idx_t = np.argmax(search_array)
t = search_array[idx_t]
# matched value
mv = min(abs(s), t)
# consolidate sink/source
search_array[idx_s] += mv
search_array[idx_t] -= mv
total_matched += mv
# store found pair
if as_label is True:
pair = (
self._node_labels[idx_s],
self._node_labels[idx_t],
mv
)
else:
pair = (
self._node_ids[idx_s],
self._node_ids[idx_t],
mv
)
st_pairs.append(pair)
# check if consolidation successful
if (np.isclose(total_matched, self.total_rate)
and np.all(np.isclose(search_array, 0))):
return st_pairs
raise RuntimeError("Could not find s-t-pair decomposition.")
[docs] def node_in_comm(self, node, is_label: bool = True) -> bool:
# determine search array
if is_label is True:
search_arr = self._node_labels
else:
search_arr = self._node_ids
node = np.array(node, copy=False)
if len(np.intersect1d(node, search_arr)) > 0:
return True
return False
node_in_comm.__doc__ = CommoditySingleSourceSink.node_in_comm.__doc__
[docs] def to_dict(self, labels: bool = True):
"""Get commodity as dict.
Parameters
----------
labels : bool, default=True
If True, returned dict has node labels as keys, else node indices.
Returns
-------
dict
Commodity as dict.
"""
if labels is True:
if self.has_valid_labels:
return dict(zip(self.node_labels, self.rate))
if self.has_valid_indices:
return dict(zip(self.node_ids, self.rate))
raise ValueError(
"Commodity has neither valid indices nor valid labels"
)
if self.has_valid_indices:
return dict(zip(self.node_ids, self.rate))
raise ValueError(
"Commodity has neither valid indices nor valid labels"
)
[docs] def get_scaled_copy(self, scale_factor: float = 1):
"""Get copy where every rate is scaled by `scale_factor`.
Returns
-------
CommodityMultiSourceSink
Commodity with rate scaled by ``scale_factor``.
"""
cpy = self.__new__(CommodityMultiSourceSink)
cpy._node_labels = np.copy(self._node_labels)
cpy._node_ids = np.copy(self._node_ids)
cpy._rate = np.copy(self._rate) * scale_factor
return cpy
[docs] def get_sparse_elements(self, col_number: int = 0):
"""Extract commodity data for sparse demand matrix.
Parameters
----------
col_number : int, default=0
Column indices of the matrix entries.
Returns
-------
list
Row indices of the matrix entries of len N_tot.
list
Column indices of the matrix entries of len N_tot. Equals
[col_number, ..., col_number].
list
Matrix entries of len N_tot.
"""
rows = self.node_ids.tolist()
cols = np.repeat(col_number, len(rows)).tolist()
data = self.rate.tolist()
return (rows, cols, data)
[docs] def map_node_labels(self, d) -> None:
self._node_ids = np.vectorize(d.__getitem__)(self._node_labels).astype(int)
map_node_labels.__doc__ = CommoditySingleSourceSink.map_node_labels.__doc__
[docs] def map_node_ids(self, d) -> None:
self._node_labels = np.vectorize(d.__getitem__)(self._node_ids).astype(str)
map_node_ids.__doc__ = CommoditySingleSourceSink.map_node_ids.__doc__
[docs] @classmethod
def from_label_and_id(cls, data) -> CommodityMultiSourceSink:
"""Construct from fully specified data (labels and ids).
Parameters
----------
data : tuple of ndarray
``(labels, ids, rate)`` where every element is ndarray of
shape (N_tot, ).
Returns
-------
CommodityMultiSourceSink
Commodity constructed from data.
"""
# make empty object and fill with data
cm = cls.__new__(cls)
cm._node_labels = data[0]
cm._node_ids = data[1]
cm._rate = data[2]
return cm
@property
def N_tot(self) -> int:
"""Total number of nodes in commodity: sinks + sources."""
return len(self)
@property
def N_in(self) -> int:
"""Number of sources."""
return self.N_tot - self.N_out
@property
def N_out(self) -> int:
"""Number of sinks."""
return sum(self.is_sink)
@property
def is_sink(self) -> np.ndarray:
"""ndarray (N_tot, ) of bool: whether node is sink."""
return (np.sign(self.rate) + 1).astype(bool)
@property
def has_valid_indices(self) -> bool:
"""bool: whether nodes have proper indices."""
return not (ID_UNMAPPED in self.node_ids)
@property
def has_valid_labels(self) -> bool:
"""bool: whether nodes have proper labels."""
return not (LBL_UNMAPPED in self.node_labels)
@property
def is_single(self) -> bool:
"""bool: whether commodity has single source and single sink."""
if len(self) == 2 and len(self.sink_id) == 1:
return True
return False
@property
def node_ids(self) -> np.ndarray:
"""ndarray (N_tot, ) of int: node indices."""
return self._node_ids
@property
def node_labels(self) -> np.ndarray:
"""ndarray (N_tot, ) of str: node labels."""
return self._node_labels
@property
def source_lbl(self) -> np.ndarray:
"""ndarray (N_in, ) of str: source labels."""
return self._node_labels[~self.is_sink]
@property
def source_id(self) -> np.ndarray:
"""ndarray (N_in, ) of int: source indices."""
return self._node_ids[~self.is_sink]
@property
def sink_lbl(self) -> np.ndarray:
"""ndarray (N_out, ) of str: sink labels."""
return self._node_labels[self.is_sink]
@property
def sink_id(self) -> np.ndarray:
"""ndarray (N_out, ) of int: sink indices."""
return self._node_ids[self.is_sink]
@property
def rate(self) -> np.ndarray:
"""ndarray (N_tot, ) of float: in/outflow per node."""
return self._rate
@property
def total_rate(self) -> float:
"""float: total flow on commodity."""
return self._rate[self.is_sink].sum()
[docs]class _DV(abc.ABC):
"""Base class of a demand vector."""
def __init__(
self,
shared=None,
dtype_int=None,
dtype_float=None,
) -> None:
self._shared = shared
self._dtype_int = dtype_int
self._dtype_float = dtype_float
def __call__(self, *args, **kw) -> sps.csc_matrix:
"""Get demand as sparse matrix of shape (n, k).
Every column represents the in/outflow for the network nodes
for one commodity.
"""
return self.sparse(*args, **kw)
[docs] def get_source_sink_rate(self, scale_by: float = 1):
"""Return a source-sink-rate decomposition of this demand vector.
Parameters
----------
scale_by : float, default=1
The demand vector is scaled by this factor before determining
the s-t-r-decompositions
Returns
-------
decomposition : tuple
A tuple ``(s, t, r)`` containing sources ``s`` sinks ``t`` and
associated demand rates ``r`` that induce this demand vector
"""
if self.all_single is False:
raise ValueError("TODO.")
# Retrieve rates
b = self()
rates = scale_by * b.data[b.data > 0]
# Collect (cached) sources
if self.cache.is_valid("source_indices") is False:
sources = self.cache["source_indices"] = b.indices[b.data < 0]
else:
sources = self.cache["source_indices"]
# Collect (cached) sinks
if self.cache.is_valid("sink_indices") is False:
sinks = self.cache["sink_indices"] = b.indices[b.data > 0]
else:
sinks = self.cache["sink_indices"]
return sources, sinks, rates
[docs] def add_to_etree(
self,
root: et.Element,
name=None,
) -> None:
"""Add demand data to xml.etree.ElementTree.Element.
Data will be added based on self.value(param=1). Note that only
LinearDemandFunctions can be properly saved to xml.
Parameters
----------
root : xml.etree.ElementTree.Element
Element to which 'commodities' will be appended to.
name : str, optional
Attribute name of 'commodities' element. If None, no attribute
will be set.
"""
commodities = et.SubElement(root, 'commodities')
if name is not None:
commodities.attrib["name"] = name
spmat = self.sparse()
for i in range(spmat.shape[1]):
col = spmat[:, i]
lbls = self.shared.get_node_label(col.indices)
data = col.data
com_node = et.SubElement(commodities, 'commodity')
if len(data) == 2:
# Commodity with single source and single sink
com_node.attrib['from'] = lbls[np.where(data < 0)[0][0]]
com_node.attrib['to'] = lbls[np.where(data > 0)[0][0]]
com_node.attrib['rate'] = str(abs(data[0]))
else:
# Commodity with multiple sources and/or multipls sinks
com_dict = dict(zip(lbls, data))
for (elem_lbl, elem_val) in com_dict.items():
if elem_val != 0:
b_node = et.SubElement(com_node, 'b')
b_node.attrib['node'] = elem_lbl
b_node.attrib['value'] = str(elem_val)
[docs] @classmethod
def from_npz(
cls,
data,
shared=None,
prefix: str = "",
**kwargs
) -> DemandVector | DemandVectorSP:
if isinstance(data, str):
data = np.load(data)
if prefix + "node_labels" in data:
return DemandVectorSP.from_npz(data, shared=shared, prefix=prefix, **kwargs)
return DemandVector.from_npz(data, shared=shared, prefix=prefix, **kwargs)
from_npz.__func__.__doc__ = _doc.from_npz_shared.__doc__
@property
def k(self) -> int:
"""Number of commodities in demand vector."""
return len(self)
@property
def n(self) -> int:
if self.shared is not None:
return self.shared.n
elif hasattr(self, "_n"):
return self._n
raise AttributeError(
"Number of nodes has not been set: Neither shared object nor "
"number of nodes specified."
)
@n.setter
def n(self, val: int) -> None:
self._n = val
@property
def shared(self):
"""Shared object for network objects.
See Also
--------
paminco.net.shared.Shared
"""
return self._shared
@shared.setter
def shared(self, val: int) -> None:
self._shared = val
@property
def dtype_int(self):
if self._dtype_int is not None:
return self._dtype_int
elif self.shared is not None:
return self.shared.dtype_int
return DTYPE_INT
@dtype_int.setter
def dtype_int(self, val) -> None:
self._dtype_int = val
@property
def dtype_float(self):
if self._dtype_float is not None:
return self._dtype_float
elif self.shared is not None:
return self.shared.dtype_float
return DTYPE_FLOAT
@dtype_float.setter
def dtype_float(self, val) -> None:
self._dtype_float = val
@abc.abstractmethod
def delete_nodes(self): ...
@abc.abstractmethod
def to_single_pairs(self): ...
@abc.abstractmethod
def map_node_label_to_id(self): ...
@abc.abstractmethod
def map_node_id_to_label(self): ...
@abc.abstractmethod
def scaled_copy(self): ...
@abc.abstractmethod
def sparse(self): ...
@abc.abstractmethod
def make_save_dict(self): ...
@abc.abstractmethod
def save_to_numpy(self): ...
@property
@abc.abstractmethod
def all_single(self): ...
@property
@abc.abstractmethod
def rate(self): ...
@property
@abc.abstractmethod
def source_lbl(self): ...
@property
@abc.abstractmethod
def sink_lbl(self): ...
@property
@abc.abstractmethod
def source_id(self): ...
@property
@abc.abstractmethod
def sink_id(self): ...
[docs]class DemandVector(_DV):
"""Class that builds on a number of commodities (k).
A DemandVector object can be instantiated in several ways:
DemandVector(dv)
where ``dv`` is a DemandVector.
DemandVector(M)
where ``M`` is matrix (ndarray or spmatrix) with node
indices as rows and commodities as columns.
DemandVector(it)
where ``it`` an iterable with elements that are tuples or
dicts.
Parameters
----------
shared : Shared
Shared object for all network objects.
data : DemandVector, ndarray, spmatrix, or iterable
(Processed) commodity data.
copy : bool, default=True
Whether to copy data in ``data``.
is_label : bool, default=True
Whether commodity nodes are given as labels or ids in data if
data is iterable.
Attributes
----------
k
shared
all_single
commodities
source_lbl
sink_lbl
source_id
sink_id
rate
total_rate
See Also
--------
Commodity
CommoditySingleSourceSink
CommodityMultiSourceSink
"""
def __init__(
self,
data,
shared=None,
dtype_int=None,
dtype_float=None,
copy: bool = True,
is_label: bool = True
) -> None:
super().__init__(
shared=shared,
dtype_int=dtype_int,
dtype_float=dtype_float,
)
if isinstance(data, DemandVector):
if copy is True:
self._comm = deepcopy(data._comm)
else:
self._comm = data._comm
elif isinstance(data, (np.ndarray, sps.spmatrix)):
self._comm = []
for j in range(data.shape[1]):
idx = data[:, j] != 0
ids = idx.nonzero()[0]
vals = np.array(data[:, j][idx]).flatten()
d = dict(zip(ids, vals))
self._comm.append(CommodityMultiSourceSink(d, is_label=False))
else:
self._comm = [Commodity.from_data(c, is_label=is_label) for c in data]
self._comm = [c for c in self._comm if c is not None]
self.cache = Cache(counter=True)
def __eq__(self, other) -> bool:
assert len(self) == len(other), \
"DemandVectors must have same length."
for (c1, c2) in list(zip(self._comm, other._comm)):
if c1 != c2:
return False
return True
def __len__(self) -> int:
return len(self._comm)
[docs] def reset_cache(self, hard: bool = False) -> None:
self.cache.reset(hard=hard)
reset_cache.__doc__ = _doc.reset_cache.__doc__
[docs] def delete_nodes(self, nodes) -> int:
"""Delete all commodities that are made up by ``nodes``.
Parameters
----------
nodes : int or array_like
Indices of nodes to remove.
Returns
-------
int
Number of removed commodities.
"""
len_before = len(self)
self._comm = [c for c in self._comm if not c.node_in_comm(nodes, is_label=False)]
self.reset_cache()
return len_before - len(self)
[docs] def map_node_label_to_id(self) -> None:
"""Map labels -> ids for all commodities."""
for c in self._comm:
try:
c.map_node_labels(self.shared.node2id)
except KeyError as e:
raise KeyError(f"Node with label {e} not in network nodes.")
self.cache.set_invalid("source_id", "sink_id")
[docs] def map_node_id_to_label(self) -> None:
"""Map ids -> labels for all commodities."""
d = {v: k for k, v in self.shared.node2id.items()}
for c in self._comm:
try:
c.map_node_ids(d)
except KeyError as e:
raise KeyError(f"Node with id {e} not in network nodes.")
self.cache.set_invalid("source_lbl", "sink_lbl")
[docs] def scaled_copy(self, f: float = 1):
"""Get a copy where rate of every commodity is scaled by ``f``.
Parameters
----------
f : float, default=1
Scale factor.
Returns
-------
DemandVector
Copy of demand vector with scaled commodites.
"""
cpy = self.__new__(DemandVector)
cpy._s = self._s
cpy._comm = [c.get_scaled_copy(f) for c in self._comm]
cpy.cache = Cache()
return cpy
[docs] def to_single_pairs(self, as_label: bool = True):
"""Decompose all CommodityMultiSourceSink.
Parameters
----------
as_label : bool, default=True
If True decomposed commodites are set up by labels, else
by id.
Returns
-------
DemandVectorSP
Demand vector with single commodities only.
See Also
--------
CommodityMultiSourceSink.decompose
"""
data = []
for c in self._comm:
if isinstance(c, CommodityMultiSourceSink):
data.extend(c.decompose(as_label=as_label))
else:
data.append(c.to_tuple(as_label=as_label))
dv = DemandVectorSP(data, shared=self.shared, is_label=as_label)
if as_label is True:
dv.map_node_label_to_id()
return dv
[docs] def sparse(self, dtype=None) -> sps.csc_matrix:
"""Get demand as sparse matrix.
Total in/outflow per node (row) per commodity (column), e.g.:
Data::
(0, 2, 100) [single commodity]
{1: 33, 2: 33, 3: -66} [multi commodity]
Resulting matrix::
[[-100, 0],
[ 0, 33],
[ 100, 33],
[ 0, -66]]
Parameters
----------
dtype: dtype, optional
Datatype of returned sparse matrix.
Returns
-------
scipy.sparse.csc_matrix
Demand as sparse matrix.
"""
if dtype is None:
dtype = self.dtype_float
if self.cache.is_valid("sparse") is False:
rows, cols, data = [], [], []
for (i, comm) in enumerate(self._comm):
r, c, d = comm.get_sparse_elements(col_number=i)
rows += r
cols += c
data += d
sparse = sps.coo_matrix((data, (rows, cols)),
shape=(self.n, len(self)),
dtype=dtype)
self.cache["sparse"] = (sps.csc_matrix(sparse), sparse)
# recast datatype if necessary for return
return sps.csc_matrix(self.cache["sparse"][0], dtype=dtype)
[docs] def make_save_dict(
self,
prefix: str = "",
save_dict=None
) -> dict:
if save_dict is None:
save_dict = {}
if "demand_type" not in save_dict:
save_dict["demand_type"] = self.__class__.__name__
idx_single = []
idx_multi = []
labels = []
ids = []
rates = []
# collect all single commodites and remember indices
for (i, c) in enumerate(self._comm):
if isinstance(c, CommoditySingleSourceSink):
idx_single.append(i)
labels.append([getattr(c, "_source_lbl"), getattr(c, "_sink_lbl")])
ids.append([getattr(c, "_source_id"), getattr(c, "_sink_id")])
rates.append([getattr(c, "_rate")])
elif isinstance(c, CommodityMultiSourceSink):
idx_multi.append(i)
save_dict[prefix + "cm_" + str(i) + "id"] = getattr(c, "_node_ids")
save_dict[prefix + "cm_" + str(i) + "label"] = getattr(c, "_node_labels")
save_dict[prefix + "cm_" + str(i) + "rate"] = getattr(c, "_rate")
save_dict[prefix + "cs_labels"] = np.array(labels)
save_dict[prefix + "cs_ids"] = np.array(ids)
save_dict[prefix + "cs_rates"] = np.array(rates)
save_dict[prefix + "idx_single"] = np.array(idx_single)
save_dict[prefix + "idx_multi"] = np.array(idx_multi)
return save_dict
make_save_dict.__doc__ = _doc.make_save_dict.__doc__
[docs] def save_to_numpy(self, file: str, **kwargs) -> None:
save_dict = self.make_save_dict()
save_dict.update(kwargs)
np.savez(file, **save_dict)
save_to_numpy.__doc__ = _doc.save_to_numpy.__doc__
[docs] @classmethod
def from_xml(cls): pass
# TODO-PJ linmk to func
[docs] @classmethod
def from_npz(
cls,
data,
shared=None,
prefix: str = "",
**kwargs
) -> DemandVector:
# load data
if isinstance(data, str):
data = np.load(data)
cs_labels = data[prefix + "cs_labels"]
cs_ids = data[prefix + "cs_ids"]
cs_rates = data[prefix + "cs_rates"]
idx_single = data[prefix + "idx_single"].tolist()
idx_multi = data[prefix + "idx_multi"].tolist()
if isinstance(idx_single, int):
idx_single = [idx_single]
if isinstance(idx_multi, int):
idx_multi = [idx_multi]
comm = []
# load single commodites
if len(cs_labels) > 0:
for d in list(zip(cs_labels, cs_ids, cs_rates.ravel())):
comm.append(CommoditySingleSourceSink.from_labels_and_id(d))
# load multi commodities
if len(idx_multi) > 0:
for idx_m in idx_multi:
b = prefix + "cm_" + str(idx_m)
d = [data[b + "label"], data[b + "id"], data[b + "rate"]]
comm.append(CommodityMultiSourceSink.from_label_and_id(d))
# reindex commodities
idx_full = idx_single + idx_multi
comm = [c for idx, c in sorted(zip(idx_full, comm))]
return cls(comm, shared=shared, **kwargs)
from_npz.__func__.__doc__ = _doc.from_npz_shared.__doc__
@property
def all_single(self) -> bool:
"""Whether is comprised only of commodities with single sink / source."""
if self.cache.is_valid("sparse") is False:
m = self.sparse()
return (m.getnnz(axis=0) == 2).all()
return (self.cache["sparse"][1].getnnz(axis=0) == 2).all()
@property
def commodities(self) -> list:
"""List [len k] of commdodities: CommoditySingleSourceSink or CommodityMultiSourceSink."""
return self._comm
@property
def source_lbl(self) -> list:
"""List [len k] of str or ndarray: source labels per commodity."""
if self.cache.is_valid("source_lbl") is False:
v = self.cache["source_lbl"] = [c.source_lbl for c in self._comm]
return v
return self.cache["source_lbl"]
@property
def sink_lbl(self) -> list:
"""List [len k] of str or ndarray: sink labels per commodity."""
if self.cache.is_valid("sink_lbl") is False:
v = self.cache["sink_lbl"] = [c.sink_lbl for c in self._comm]
return v
return self.cache["sink_lbl"]
@property
def source_id(self) -> list:
"""List [len k] of int or ndarray: source indices per commodity."""
if self.cache.is_valid("source_id") is False:
v = self.cache["source_id"] = [c.source_id for c in self._comm]
return v
return self.cache["source_id"]
@property
def sink_id(self) -> list:
"""List [len k] of int or ndarray: sink indices per commodity."""
if self.cache.is_valid("sink_id") is False:
v = self.cache["sink_id"] = [c.sink_id for c in self._comm]
return v
return self.cache["sink_id"]
@property
def rate(self) -> list:
"""List [len k] of float or ndarray: node in/outflow per commodity."""
if self.cache.is_valid("rate") is False:
v = self.cache["rate"] = [c.rate for c in self._comm]
return v
return self.cache["rate"]
@property
def total_rate(self) -> np.ndarray:
"""np.ndarray (k, ): total flow per commodity."""
if self.cache.is_valid("total_rate") is False:
v = self.cache["total_rate"] = np.array([c.total_rate for c in self._comm])
return v
return self.cache["total_rate"]
[docs]class DemandVectorSP(_DV):
"""Class that builds on a number of commodities `k`.
A DemandVectorSP object can be instantiated in several ways:
DemandVectorSP(dv)
where ``dv`` is a DemandVectorSP.
DemandVectorSP(arr)
where ``arr`` is array_like. The resulting array from
``numpy.array(arr)`` is expected to look like::
[source_1, sink_1, rate_1]
...
[source_k, sink_k, rate_k]
Parameters
----------
data : DemandVector or array_like
Commodity data in processed or unprocessed form.
shared : Shared, optional
Shared object for all network objects.
copy : bool, default=True
Whether to copy data in ``data``.
is_label : bool, default=True
Whether commodity nodes are given as labels or ids in data.
Attributes
----------
k
shared
all_single
rate
total_rate
source_lbl
sink_lbl
source_id
sink_id
"""
def __init__(
self,
data,
shared=None,
dtype_int=None,
dtype_float=None,
copy: bool = True,
is_label: bool = True
) -> None:
super().__init__(
shared=shared,
dtype_int=dtype_int,
dtype_float=dtype_float,
)
if isinstance(data, DemandVectorSP):
self._node_labels = np.array(data._node_labels, copy=copy)
self._node_ids = np.array(data._node_ids, copy=copy)
self._rates = np.array(data._rates, copy=copy)
elif isinstance(data, (list, np.ndarray)):
data = np.array(data, copy=False)
if is_label is True:
self._node_labels = data[:, :2].astype(str)
self._node_ids = np.ones_like(self._node_labels, dtype=int) * ID_UNMAPPED
else:
self._node_ids = data[:, :2].astype(int)
self._node_labels = np.full(self._node_ids.shape, LBL_UNMAPPED)
self._rates = data[:, 2].astype(float)
else:
raise TypeError("Invalid input data.")
# remove zero rate commodities
zc = (self._rates == 0)
self._rates = np.delete(self._rates, zc)
self._node_ids = np.delete(self._node_ids, zc, axis=0)
self._node_labels = np.delete(self._node_labels, zc, axis=0)
self.cache = Cache()
def __len__(self) -> int:
return len(self._rates)
def __eq__(self, other) -> bool:
for att in ["_rates", "_node_ids", "_node_labels"]:
if np.array_equal(getattr(self, att), getattr(other, att)) is False:
return False
return True
[docs] def reset_cache(self, hard: bool = False) -> None:
self.cache.reset(hard=hard)
reset_cache.__doc__ = _doc.reset_cache.__doc__
[docs] def map_node_label_to_id(self) -> None:
try:
self._node_ids = (self.shared.get_node_id(self._node_labels,
vectorize=True)
.astype(int))
except KeyError as e:
raise KeyError("Node id " + str(e) + " not in network nodes.")
map_node_label_to_id.__doc__ = DemandVector.map_node_label_to_id.__doc__
[docs] def map_node_id_to_label(self) -> None:
try:
self._node_labels = (self.shared.get_node_label(self._node_ids,
vectorize=True)
.astype(str))
except KeyError as e:
raise KeyError("Node id " + str(e) + " not in network nodes.")
map_node_id_to_label.__doc__ = DemandVector.delete_nodes.__doc__
[docs] def scaled_copy(self, f: float = 1):
"""Get a copy where every commodity is scaled by `f`.
Parameters
----------
f : float, default=1
Scale factor.
Returns
-------
DemandVectorSP
Copy of DemandVectorSP with scaled commodites.
"""
cpy = DemandVectorSP(self, copy=True)
cpy._rates *= f
return cpy
[docs] def to_single_pairs(self, as_label: bool = True) -> DemandVectorSP:
"""Returns self.
Used for consistency with DemandVector.to_single_pairs
"""
return self
[docs] def delete_nodes(self, nodes) -> int:
len_before = len(self)
nodes = np.array(nodes).reshape(1, -1)
# identify commodities where source or sink equals nodes
idx_source = ((self.source_id.reshape(-1, 1) - nodes) == 0).any(axis=1)
idx_sink = ((self.sink_id.reshape(-1, 1) - nodes) == 0).any(axis=1)
all_idx = idx_source | idx_sink
# delete indices in data arrays
self._rates = np.delete(self._rates, all_idx, axis=0)
self._node_labels = np.delete(self._node_labels, all_idx, axis=0)
self._node_ids = np.delete(self._node_ids, all_idx, axis=0)
self.reset_cache()
return len_before - len(self)
delete_nodes.__doc__ = DemandVector.delete_nodes.__doc__
[docs] def sparse(self, dtype=None) -> sps.csc_matrix:
"""Get demand as sparse matrix.
Total in/outflow per node (row) per commodity (column), e.g.,:
Data::
(0, 2, 100)
(3, 1, 66)
Resulting matrix::
[[-100, 0],
[ 0, 66],
[ 100, 0],
[ 0, -66]]
Parameters
----------
dtype: dtype, optional
Datatype of returned sparse matrix.
Returns
-------
scipy.sparse.csc_matrix
Demand as sparse matrix.
"""""
if dtype is None:
dtype = self.dtype_float
if self.cache.is_valid("sparse") is False:
rows = self._node_ids.ravel()
cols = np.repeat(range(len(self)), 2)
data = (np.array([-1, 1]).reshape(-1, 1) * self.rate.T).T.ravel()
sparse = sps.coo_matrix((data, (rows, cols)),
shape=(self.n, len(self)),
dtype=dtype)
self.cache["sparse"] = (sps.csc_matrix(sparse), sparse)
return sps.csc_matrix(sparse, dtype=dtype)
# recast datatype if necessary for return
return sps.csc_matrix(self.cache["sparse"][0], dtype=dtype)
[docs] def make_save_dict(
self,
prefix: str = "",
save_dict=None
) -> dict:
if save_dict is None:
save_dict = {}
if "demand_type" not in save_dict:
save_dict["demand_type"] = self.__class__.__name__
save_att = {
"node_labels": "_node_labels",
"node_ids": "_node_ids",
"rates": "_rates",
}
for (k, v) in save_att.items():
save_dict[prefix + k] = getattr(self, v)
return save_dict
make_save_dict.__doc__ = _doc.make_save_dict.__doc__
[docs] def save_to_numpy(self, file: str, **kwargs) -> None:
save_dict = self.make_save_dict()
save_dict.update(kwargs)
np.savez(file, **save_dict)
save_to_numpy.__doc__ = _doc.save_to_numpy.__doc__
[docs] @classmethod
def from_xml(cls): pass
# TODO-PJ linmk to func
[docs] @classmethod
def from_npz(
cls,
data,
shared=None,
prefix: str = "",
**kwargs
) -> DemandVectorSP:
# load np data
if isinstance(data, str):
data = np.load(data)
# make empty edge object and fill with data
dv = cls.__new__(cls)
if len(data[prefix + "rates"]) == 0:
dv._node_labels = np.empty((0, 2), str)
dv._node_ids = np.empty((0, 2), int)
dv._rates = np.empty(0)
else:
dv._node_labels = data[prefix + "node_labels"]
dv._node_ids = data[prefix + "node_ids"]
dv._rates = data[prefix + "rates"]
dv._s = shared
dv.cache = Cache()
return dv
from_npz.__func__.__doc__ = _doc.from_npz_shared.__doc__
@property
def all_single(self) -> bool:
return True
all_single.__doc__ = DemandVector.all_single.__doc__
@property
def rate(self) -> np.ndarray:
"""ndarray (k, ) of flow: rates."""
return self._rates
@property
def source_lbl(self) -> np.ndarray:
"""ndarray (k, ) of str: source labels."""
return self._node_labels[:, 0]
@property
def sink_lbl(self) -> np.ndarray:
"""ndarray (k, ) of str: sink labels."""
return self._node_labels[:, 1]
@property
def source_id(self) -> np.ndarray:
"""ndarray (k, ) of int: source indices."""
return self._node_ids[:, 0]
@property
def sink_id(self) -> np.ndarray:
"""ndarray (k, ) of int: sink indices."""
return self._node_ids[:, 1]
@property
def total_rate(self) -> np.ndarray:
return self.rate
total_rate.__doc__ = DemandVector.total_rate.__doc__
def read_comm_from_xml(data):
"""Read commodity data from XML.
Parameters
----------
data : str, xml.etree.ElementTree.ElementTree, or xml.etree.ElementTree.Element
XML file, tree or root itself.
Returns
-------
com_data : list, optional
Commodity data, one commodity per element in list. A tuple
``(source, target, rate) `` corresponds to a commodity with a
one source and one target. Otherwise a dict with the out/inflow
is appended, e.g., {node1: -100, node2: 50, node3: 50}.
"""
if isinstance(data, et.Element) is False:
data = xml_find_root(data)
if data.tag != TAG_COMMODITIES:
data = data.find(TAG_COMMODITIES)
if data is None:
return None
return _comm_from_xml_node(data)
def _comm_from_xml_node(xml_commodity_node: et.Element) -> list:
com_data = []
for c in xml_commodity_node:
if all(k in c.attrib for k in ("from", "to", "rate")):
data = (c.attrib["from"], c.attrib["to"], float(c.attrib["rate"]))
com_data.append(data)
else:
c_info = dict()
b_nodes = c.findall("b")
for b in b_nodes:
c_info[b.attrib['node']] = float(b.attrib['value'])
com_data.append(c_info)
return com_data
def demand_vector(
data,
shared=None,
dtype_int=None,
dtype_float=None,
single_pairs: bool = False,
**kwargs
) -> DemandVector | DemandVectorSP:
"""Setup demand vector based on ``data``.
Parameters
----------
data : DemandVector, DemandVectorSP, spmatrix, ndarray, or iterable
``DemandVector or DemandVectorSP``: (a copy) of data will be
returned.
``spmatrix or ndarray``: a DemandVector will be inferred.
``iterable and single_pairs == False`` : DemandVector.
``iterable and single_pairs == True`` : DemandVectorSP.
shared : Shared
Shared object for all network objects.
single_pairs : bool, default=False,
If True, infer DemandVectorSP, else DemandVector.
kwargs : keyword arguments, optional
Further keyword arguments passed to constructor.
Returns
-------
DemandVector or DemandVectorSP
Demand vector.
See Also
--------
paminco.net.demand.DemandVector
paminco.net.demand.DemandVectorSP
"""
# Parse from xml
if isinstance(data, str):
data = read_comm_from_xml(data)
if isinstance(data, DemandVector):
return DemandVector(
data,
shared=shared,
dtype_int=dtype_int,
dtype_float=dtype_float,
**kwargs
)
elif isinstance(data, DemandVectorSP):
return DemandVectorSP(
data,
shared=shared,
dtype_int=dtype_int,
dtype_float=dtype_float,
**kwargs
)
elif isinstance(data, np.ndarray) or sps.isspmatrix(data):
return DemandVector(
data,
shared=shared,
dtype_int=dtype_int,
dtype_float=dtype_float,
**kwargs
)
else:
if ((isinstance(data, tuple) and len(data) == 3) or
isinstance(data, dict)):
data = [data]
if single_pairs is True:
return DemandVectorSP(
data,
shared=shared,
dtype_int=dtype_int,
dtype_float=dtype_float,
**kwargs
)
else:
return DemandVector(
data,
shared=shared,
dtype_int=dtype_int,
dtype_float=dtype_float,
**kwargs
)
[docs]class DemandFunction(abc.ABC):
"""Abstract class representing a demand function.
Parameters
----------
shared : Shared, optional
Shared network object used for label <-> id mappings.
Attributes
----------
cache : Cache
A cache.
shared
all_single_pairs
shared
demand_vectors
is_single_commodity
is_multi_commodity
value_at_1
unique_sources
"""
def __init__(
self,
shared=None,
dtype_int=None,
dtype_float=None,
) -> None:
self._shared = shared
self._dtype_int = dtype_int
self._dtype_float = dtype_float
self.cache = Cache()
def __call__(self, param: float = 1) -> sps.csc_matrix:
return self.value(param)
def __eq__(self, other) -> bool:
assert len(self) == len(other), \
"DemandFunctions must have same number of commodities."
for (dv1, dv2) in list(zip(self.demand_vectors, other.demand_vectors)):
if dv1 != dv2:
return False
return True
def __len__(self) -> int:
"""Number of commodities modelled by the demand function."""
return len(self.b)
[docs] def reset_cache(self, hard: bool = False) -> None:
self.cache = Cache()
reset_cache.__doc__ = _doc.reset_cache.__doc__
[docs] def inflow(self, param: float) -> np.ndarray:
"""Get the total inflow of all commodities for demand(param).
Parameters
----------
param : float
Returns
-------
np.ndarray
Total inflow of all commodities.
"""
b = self.value(param)
return np.array(b.multiply(b > 0).sum(axis=0), copy=False).ravel()
[docs] def ddx_inflow(self, param: float) -> np.ndarray:
"""Get the total inflow of all commodities for demand'(param).
Parameters
----------
param : float
Returns
-------
np.ndarray
Total inflow of all commodities.
"""
b = self.ddx(param)
return np.array(b.multiply(b > 0).sum(axis=0), copy=False).ravel()
[docs] def max_inflow(
self,
max_param: float,
min_param: float = 0,
) -> np.ndarray:
"""Get maxmimum inflow for every commodity.
Parameters
----------
max_param : float
min_param : float, default=0
Returns
-------
ndarray
Ndarray of shape (k, ): max inflow for every commodity.
"""
b_min = self.value(min_param)
b_max = self.value(max_param)
inflow_min = np.array(b_min.multiply(b_min > 0).sum(axis=0), copy=False).ravel()
inflow_max = np.array(b_max.multiply(b_max > 0).sum(axis=0), copy=False).ravel()
return np.max(np.vstack((inflow_min, inflow_max)).T, axis=1)
[docs] def delete_nodes(self, *args, **kwargs) -> int:
len_before = len(self)
for dv in self.demand_vectors.values():
dv.delete_nodes(*args, **kwargs)
return len_before - len(self)
delete_nodes.__doc__ = DemandVector.delete_nodes.__doc__
[docs] def map_node_label_to_id(self) -> None:
for dv in self.demand_vectors.values():
dv.map_node_label_to_id()
map_node_label_to_id.__doc__ = DemandVector.map_node_label_to_id.__doc__
[docs] def map_node_id_to_label(self) -> None:
for dv in self.demand_vectors.values():
dv.map_node_id_to_label()
map_node_id_to_label.__doc__ = DemandVector.map_node_id_to_label.__doc__
[docs] def to_single_pairs(self, **kwargs) -> None:
"""Decompose all commodities in demand vectors."""
for k in self._demand_vectors:
v = getattr(self, k)
setattr(self, k, v.to_single_pairs(**kwargs))
[docs] def add_to_etree(
self,
root: et.Element,
overwrite: bool = True,
) -> None:
"""Add demand data to xml.etree.ElementTree.Element.
Parameters
----------
root : xml.etree.ElementTree.Element
Element to which 'commodities' will be appended to.
overwrite : bool, default=True
If True, existing 'commodities' Element in root will be
deleted. If False, edge data will be appended to the
existing data.
"""
if overwrite is True:
for c in root.findall(TAG_COMMODITIES):
root.remove(c)
# Write type of demand func to metadata
metadata = xml_add_element(root, "metadata")
demand_func = xml_add_element(metadata, "demand_func")
demand_func.text = self.__class__.__name__
# Write all demand vectors of demand function
for dv_name, dv in self.demand_vectors.items():
dv.add_to_etree(root, name=dv_name)
[docs] @classmethod
def from_xml(
self,
data,
shared=None,
**kw,
) -> LinearDemandFunction | AffineDemandFunction:
# Get root of XML
root = xml_find_root(data)
# Determine which type of demand function to readin
demand_func = root.find("metadata/demand_func")
if (demand_func is None
or demand_func.text == "LinearDemandFunction"):
return LinearDemandFunction.from_xml(root, shared=shared, **kw)
elif demand_func.text == "AffineDemandFunction":
return AffineDemandFunction.from_xml(root, shared=shared, **kw)
raise ValueError(f"Invalid demand_func in XML: {demand_func.text}.")
[docs] def make_save_dict(
self,
prefix: str = "",
save_dict=None
) -> dict:
if save_dict is None:
save_dict = {}
save_dict["demand_type"] = self.__class__.__name__
for (k, dv) in self.demand_vectors.items():
save_dict = dv.make_save_dict(
prefix=prefix + k + "_",
save_dict=save_dict,
)
return save_dict
make_save_dict.__doc__ = _doc.make_save_dict.__doc__
[docs] def save_to_numpy(self, file: str, **kwargs) -> None:
save_dict = self.make_save_dict()
save_dict.update(kwargs)
np.savez(file, **save_dict)
save_to_numpy.__doc__ = _doc.save_to_numpy.__doc__
[docs] @classmethod
def from_npz(
cls,
data,
shared=None,
prefix: str = "",
**kwargs
) -> LinearDemandFunction | AffineDemandFunction:
# load np data
if isinstance(data, str):
data = np.load(data)
if ("demand_type" not in data or
str(data["demand_type"]) == "LinearDemandFunction"):
return LinearDemandFunction.from_npz(data, shared=shared, prefix=prefix)
elif str(data["demand_type"]) == "AffineDemandFunction":
return AffineDemandFunction.from_npz(data, shared=shared, prefix=prefix)
raise ValueError(f"invalid demand type: {str(data['demand_type'])}.")
from_npz.__func__.__doc__ = _doc.from_npz_shared.__doc__
@property
def _dv0(self):
return getattr(self, self._demand_vectors[0])
@property
def all_single_pairs(self) -> bool:
"""bool: whether all commodities have single source/sink."""
return bool((self(1).getnnz(axis=0) == 2).all())
@property
def n(self) -> int:
if self.shared is not None:
return self.shared.n
elif self._dv0.n is not None:
return self._dv0.n
raise AttributeError(
"Number of nodes has not been set: Neither shared object nor "
"number of nodes specified."
)
@n.setter
def n(self, val: int) -> None:
for dv in self.demand_vectors.values():
dv.n = val
@property
def shared(self):
"""Shared object for network objects.
See Also
--------
paminco.net.shared.Shared
"""
return self._dv0.shared
@shared.setter
def shared(self, s: Shared) -> None:
for dv in self.demand_vectors.values():
dv.shared = s
shared.__doc__ = _DV.shared.__doc__
@property
def dtype_int(self):
return self._dv0.dtype_int
@dtype_int.setter
def dtype_int(self, val) -> None:
for dv in self.demand_vectors.values():
dv.dtype_int = val
@property
def dtype_float(self):
return self._dv0.dtype_float
@dtype_float.setter
def dtype_float(self, val) -> None:
for dv in self.demand_vectors.values():
dv.dtype_float = val
@property
def demand_vectors(self):
"""Get names and values for all vectors making up demand func."""
return {k: getattr(self, k) for k in self._demand_vectors}
@property
def is_single_commodity(self) -> bool:
"""Whether demand function consists of single commodity only."""
return (self.sparse().shape[1] == 1)
@property
def is_multi_commodity(self) -> bool:
"""Whether demand function consists of more than one commodity."""
return (self.sparse().shape[1] > 1)
@property
def value_at_1(self) -> sps.spmatrix:
"""Get value of demand function h(1)."""
if self.cache.is_valid("value_at_1") is False:
v = self.cache["value_at_1"] = self(1)
return v
return self.cache["value_at_1"]
@property
def unique_sources(self) -> np.ndarray:
"""Get all unique sources."""
if self.cache.is_valid("unique_sources") is False:
u_sources = np.empty(0, dtype=int)
for dv in self.demand_vectors.values():
u_sources = np.union1d(u_sources, np.where((dv() < 0).getnnz(axis=1))[0])
v = self.cache["unique_sources"] = u_sources
return v
return self.cache["unique_sources"]
@abc.abstractmethod
def ddx(self) -> sps.csc_matrix: ...
@abc.abstractmethod
def value(self) -> sps.csc_matrix: ...
@abc.abstractmethod
def get_source_sink_rate(self): ...
[docs]class LinearDemandFunction(DemandFunction):
"""A Linear demand function.
Modelling the function::
demand(param) = param * b.
Parameters
----------
b : DemandVector, DemandVectorSP, spmatrix or iterable
Demand direction, scales linearly with param.
shared : Shared, optional
Shared object for all network objects.
Attributes
----------
b : DemandVector or DemandVectorSP
Demand direction, scales linearly with param.
cache : Cache
A cache.
shared
all_single_pairs
shared
demand_vectors
is_single_commodity
is_multi_commodity
value_at_1
unique_sources
See Also
--------
demand_vector : Initialization of demand vectors.
"""
_demand_vectors = ["b"]
def __init__(
self,
b,
shared=None,
**kwargs
) -> None:
super().__init__(shared)
self.b = demand_vector(b, shared=shared, **kwargs)
[docs] def max_inflow(
self,
max_param: float,
min_param: float = 0,
) -> np.ndarray:
"Inflow rate at ``max_param`` for all commodities."
return max_param * self.b.total_rate
[docs] def value(self, param: float = 1, **kwargs) -> sps.csc_matrix:
"""Value of demand function: demand(param).
Parameters
----------
param: float, default=1
Returns
-------
csc_matrix
demand'(param).
See Also
--------
DemandVector.sparse : details on the structure of the return
matrix.
DemandVectorSP.sparse : details on the structure of the return
matrix.
"""
return param * self.b.sparse(**kwargs)
[docs] def ddx(self, param: float, **kwargs) -> sps.csc_matrix:
"""Derivative of demand function: demand'(param).
Parameters
----------
param: float, default=1
Returns
-------
csc_matrix
demand'(param).
See Also
--------
DemandVector.sparse : details on the structure of the return
matrix.
DemandVectorSP.sparse : details on the structure of the return
matrix.
"""
return self.b.sparse(**kwargs)
[docs] def get_source_sink_rate(self, param: float):
"""Get indices of source and sink, and rates for all commodities.
Parameters
----------
param : float
Scale rates by param.
Returns
-------
source_indices : ndarray
Indices of sources, ndarray (k', ) of int, where k' is the
total amount of commodities making up this demand function.
k' is summed up over all demand vectors.
sink_indices : ndarray
Indices of sinks, ndarray (k', ) of int.
rates : ndarray
Rates, ndarray (k', ) of float.
"""
return self.b.get_source_sink_rate(scale_by=param)
[docs] @classmethod
def from_npz(
cls,
data,
shared=None,
prefix: str = "",
**kwargs
) -> LinearDemandFunction:
prefix = prefix + cls._demand_vectors[0] + "_"
b = _DV.from_npz(data, shared=shared, prefix=prefix, **kwargs)
return cls(b, shared=shared, **kwargs)
[docs] @classmethod
def from_xml(
cls,
data,
shared=None,
**kw,
) -> LinearDemandFunction:
root = xml_find_root(data)
b = _comm_from_xml_node(root.find(TAG_COMMODITIES))
return cls(b, shared=shared, **kw)
from_xml.__func__.__doc__ = _doc.from_xml_shared.__doc__
[docs]class AffineDemandFunction(DemandFunction):
"""An affine demand function.
Modelling the function::
demand(param) = b0 + param * b
where b0 and b are demand vectors modelling an equal amount of
commodities.
Parameters
----------
b0 : DemandVector, DemandVectorSP, spmatrix or iterable
Base demand, unaffected by param.
b : DemandVector, DemandVectorSP, spmatrix or iterable
Demand direction, scales linearly with param.
shared : Shared, optional
Shared object for all network objects.
Attributes
----------
b0 : DemandVector or DemandVectorSP
Base demand, unaffected by param.
b : DemandVector or DemandVectorSP
Demand direction, scales linearly with param.
cache : Cache
A cache.
shared
all_single_pairs
shared
demand_vectors
is_single_commodity
is_multi_commodity
value_at_1
unique_sources
See Also
--------
demand_vector : Initialization of demand vectors.
"""
_demand_vectors = ["b", "b0"]
def __init__(
self,
b0,
b,
shared=None,
**kwargs
) -> None:
super().__init__(shared)
# init bection and base_demand
self.b0 = demand_vector(b0, shared=shared, **kwargs)
self.b = demand_vector(b, shared=shared, **kwargs)
if len(self.b) != len(self.b0):
raise ValueError("'b' and 'b0' must have the same number of commodities.")
[docs] def value(self, param: float = 1, **kwargs) -> sps.csc_matrix:
return self.b0.sparse(**kwargs) + param * self.b.sparse(**kwargs)
value.__doc__ = LinearDemandFunction.value.__doc__
[docs] def ddx(self, param: float, **kwargs) -> sps.csc_matrix:
return self.b.sparse(**kwargs)
ddx.__doc__ = LinearDemandFunction.ddx.__doc__
[docs] def get_source_sink_rate(self, param: float):
s0, t0, r0 = self.b0.get_source_sink_rate(1)
s, t, r = self.b.get_source_sink_rate(scale_by=param)
s = np.hstack((s0, s))
t = np.hstack((t0, t))
r = np.hstack((r0, r))
return s, t, r
get_source_sink_rate.__doc__ = LinearDemandFunction.get_source_sink_rate.__doc__
[docs] @classmethod
def from_npz(
cls,
data,
shared=None,
prefix: str = "",
**kwargs
) -> LinearDemandFunction:
b = _DV.from_npz(
data,
shared=shared,
prefix=prefix + cls._demand_vectors[0] + "_",
**kwargs
)
b0 = _DV.from_npz(
data,
shared=shared,
prefix=prefix + cls._demand_vectors[1] + "_",
**kwargs
)
return cls(b0, b, shared=shared, **kwargs)
[docs] @classmethod
def from_xml(
cls,
data,
shared=None,
**kw,
) -> AffineDemandFunction:
root = xml_find_root(data)
commodities = root.findall(TAG_COMMODITIES)
for comm in commodities:
if comm.attrib["name"] == "b":
b = _comm_from_xml_node(comm)
elif comm.attrib["name"] == "b0":
b0 = _comm_from_xml_node(comm)
return cls(b0, b, shared=shared, **kw)