Source code for topotherm.postprocessing

"""
Postprocessing of the results from the optimization. This includes the
calculation of the diameter and mass flow of the pipes, and the elimination of
unused pipes and nodes.

Functions
---------
- ``calc_diam_and_velocity`` : Calculate pipe diameter and velocity depending
  on the mass flow and pipe power.
- ``sts`` : Postprocessing for the single time step model.
- ``to_networkx_graph`` : Export the postprocessed, optimal district as a
  NetworkX graph.
- ``mts`` : Postprocessing for the multiple time step model.
"""

from typing import Tuple

import numpy as np
import pyomo.environ as pyo
from scipy.optimize import root
import networkx as nx
import pandas as pd

from topotherm.settings import Settings


[docs] def diameter_and_velocity( v: Tuple[float, float], mass_lin: float, settings: Settings) -> Tuple[float, float]: """ Equations for calculating the diameter and velocity of a pipe based on mass flow and power of the pipes Parameters ---------- v : tuple of float Tuple containing the velocity and diameter (``(velocity, diameter)``). mass_lin : float Mass flow of the pipe (kg/s). settings : Settings Settings object containing water and piping parameters. Returns ------- tuple of float Tuple containing: - ``velocity`` : float Calculated velocity of the pipe (m/s). - ``diameter`` : float Calculated diameter of the pipe (m). """ vel, d = v reynolds = ((settings.water.density * vel * d) / settings.water.dynamic_viscosity) # friction factor f = (-1.8 * np.log10((settings.piping.roughness / (3.7 * d)) ** 1.11 + 6.9 / reynolds) )**-2 # eq. for diameter eq1 = vel - np.sqrt((2 * settings.piping.max_pr_loss * d) / (f * settings.water.density)) # eq. for velocity eq2 = mass_lin - settings.water.density * vel * (np.pi / 4) * d ** 2 return [eq1, eq2]
[docs] def calculate_hydraulics( power: np.ndarray, settings: Settings ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Calculate mass flow, diameter, and velocity for each pipe given the installed thermal power and the supply/return temperatures from ``settings``. Parameters ---------- power : np.ndarray Installed thermal power for each pipe. settings : Settings Settings object containing temperature setpoints and water properties. Returns ------- tuple of np.ndarray Tuple containing: - ``mass_flow`` : np.ndarray Mass flow for each pipe (kg/s). - ``diameter`` : np.ndarray Pipe diameter (m). - ``velocity`` : np.ndarray Flow velocity (m/s). """ delta_t = settings.temperatures.supply - settings.temperatures.return_ # Compute mass flow rate for all pipes m = P / (cp * deltaT) m_dot = power * 1e3 / (settings.water.heat_capacity_cp * delta_t) # helper function for solving per pipe def solve_per_pipe(m): sol = root(lambda v: diameter_and_velocity(v, m, settings), (0.5, 0.02), method='lm') return sol.x if sol.success else (np.nan, np.nan) # Vectorized solving results = np.array([solve_per_pipe(m) for m in m_dot]) vel, d = results.T return m_dot, d, vel
[docs] def sts(model: pyo.ConcreteModel, matrices: dict, settings: Settings ): """ Postprocessing for the single time step model. This includes the calculation of the diameter and velocity of the pipes and the elimination of unused pipes and nodes. Parameters ---------- model : pyo.ConcreteModel Solved Pyomo model. matrices : dict Dictionary containing the matrices. settings : tt.settings.Settings Settings for the optimization. Returns ------- dict Optimal variables and postprocessed data. """ # Get the values from the model p_ij = np.array(pyo.value(model.P['ij', 'in', :, :])) p_ji = np.array(pyo.value(model.P['ji', 'in', :, :])) p_source_inst = np.array(pyo.value(model.P_source_inst[:])) p_source = np.array(pyo.value(model.P_source[:, :])) # flow direction, binary lambda_ij = np.around(np.array(pyo.value(model.lambda_['ij', :])), 0) lambda_ji = np.around(np.array(pyo.value(model.lambda_['ji', :])), 0) lambda_sum = lambda_ij + lambda_ji q_c_opt = np.zeros([matrices['a_c'].shape[1], len(model.set_t)]) flh_c_opt = np.zeros([matrices['a_c'].shape[1], len(model.set_t)]) for d, e in model.cons: if d == 'ij': # edge in incidence matrix where pipe exits into node n (==-1) a_i_idx = np.where(matrices['a_i'][:, e] == -1) # location where a_i_idx is connected to a_c a_c_idx = np.where(matrices['a_c'][a_i_idx[0], :][0] == 1) if len(a_i_idx) != 1 or len(a_c_idx) != 1: raise ValueError('Error in the incidence matrix!') # assign the heat demand to the connected consumer if lambda is 1 q_c_opt[a_c_idx[0], :] = lambda_ij[e] * matrices['q_c'][a_c_idx[0], :] flh_c_opt[a_c_idx[0], :] = lambda_ij[e] * matrices['flh_consumer'][a_c_idx[0], :] elif d == 'ji': a_i_idx = np.where(matrices['a_i'][:, e] == 1) a_c_idx = np.where(matrices['a_c'][a_i_idx[0], :][0] == 1) if len(a_i_idx) != 1 or len(a_c_idx) != 1: raise ValueError('Error in the incidence matrix!') q_c_opt[a_c_idx[0], :] = lambda_ji[e] * matrices['q_c'][a_c_idx[0], :] flh_c_opt[a_c_idx[0], :] = lambda_ji[e] * matrices['flh_consumer'][a_c_idx[0], :] # Remove nonzero elements row-wise q_c_opt = q_c_opt[q_c_opt.any(axis=1)] flh_c_opt = np.zeros([matrices['a_c'].shape[1], len(model.set_t)]) # Postprocessing producers if np.shape(matrices['a_p'])[1] == 1: p_source_inst_opt = p_source_inst p_source_opt = p_source flh_s_opt = matrices['flh_source'] else: # clean up the sources to exclude 0 power sources p_source_inst_opt = p_source_inst[p_source_inst != 0] p_source_opt = p_source[p_source_inst != 0] flh_s_opt = matrices['flh_source'][p_source_inst != 0, :] # Adjust Incidence Matrix for further postprocessing for q, _ in enumerate(lambda_ij): # if not active, all is 0 if lambda_ij[q] == 0 and lambda_ji[q] == 0: matrices['a_i'][:, q] = 0 matrices['l_i'][q] = 0 # if opposite direction operational, switch a_i with -1 and switch # values lambda_ij for ji. This is necessary for the postprocessing. elif lambda_ji[q] == 1: matrices['a_i'][:, q] = matrices['a_i'][:, q] * (-1) p_lin = p_ij + p_ji # Power of the pipes # drop entries with 0 in the incidence matrix to reduce size valid_columns = matrices['a_i'].any(axis=0) valid_rows = matrices['a_i'].any(axis=1) p_lin_opt = p_lin[valid_columns] pos_opt = matrices['position'][valid_rows, :] a_c_opt = matrices['a_c'][valid_rows, :] a_c_opt = a_c_opt[:, a_c_opt.any(axis=0)] a_p_opt = matrices['a_p'][valid_rows, :] a_i_opt = matrices['a_i'][valid_rows, :][:, valid_columns] l_i_opt = matrices['l_i'][valid_columns] m_lin, d_lin, v_lin = calculate_hydraulics(p_lin_opt, settings) res = { 'a_i': a_i_opt, 'a_p': a_p_opt, 'a_c': a_c_opt, 'q_c': q_c_opt, 'l_i': l_i_opt, 'd_i_0': d_lin, 'm_i_0': m_lin, 'position': pos_opt, 'p': p_lin_opt, 'flh_c_opt': flh_c_opt, 'flh_s_opt': flh_s_opt, 'p_s_inst_opt': p_source_inst_opt, 'p_s_opt': p_source_opt, 'lambda_b_orig': lambda_sum } return res
[docs] def mts(model: pyo.ConcreteModel, matrices: dict, settings: Settings) -> dict: """ Postprocessing for the multiple time step model. This includes the calculation of the diameter and velocity of the pipes and the elimination of unused pipes and nodes. Parameters ---------- model : pyo.ConcreteModel Solved Pyomo model. matrices : dict Dictionary containing the matrices. settings : tt.settings.Settings Settings for the optimization. Returns ------- dict Optimal variables and postprocessed data. """ # Get the values from the model p_ij = np.reshape(np.array(pyo.value(model.P['ij', 'in', :, :])), (-1, matrices['q_c'].shape[1])) p_ji = np.reshape(np.array(pyo.value(model.P['ji', 'in', :, :])), (-1, matrices['q_c'].shape[1])) p_cap = np.array(pyo.value(model.P_cap[:])) p_source_inst = np.array(pyo.value(model.P_source_inst[:])) p_source = np.array(pyo.value(model.P_source[:, :])) # flow direction, binary lambda_ij = np.reshape( np.around(np.array(pyo.value(model.lambda_['ij', :, :])), 0), (-1, matrices['q_c'].shape[1])) lambda_ji = np.reshape( np.around(np.array(pyo.value(model.lambda_['ji', :, :])), 0), (-1, matrices['q_c'].shape[1])) # built pipes lambda_b = np.around(np.array(pyo.value(model.lambda_b[:])), 0) q_c_opt = np.zeros([matrices['a_c'].shape[1], len(model.set_t)]) flh_c_opt = np.zeros([matrices['a_c'].shape[1], len(model.set_t)]) # Exclude non-connected consumers in Q_c, only affects the economic case # Check for consumers connected in direction ij for d, e in model.cons: if d == 'ij': # edge in incidence matrix where pipe exits into node n (==-1) a_i_idx = np.where(matrices['a_i'][:, e] == -1) # location where a_i_idx is connected to a_c a_c_idx = np.where(matrices['a_c'][a_i_idx[0], :][0] == 1) if len(a_i_idx) != 1 or len(a_c_idx) != 1: raise ValueError('Error in the incidence matrix!') # assign the heat demand to the connected consumer if lambda is 1 q_c_opt[a_c_idx[0], :] = lambda_b[e] * matrices['q_c'][a_c_idx[0], :] flh_c_opt[a_c_idx[0], :] = lambda_b[e] * matrices['flh_consumer'][a_c_idx[0], :] elif d == 'ji': a_i_idx = np.where(matrices['a_i'][:, e] == 1) a_c_idx = np.where(matrices['a_c'][a_i_idx[0], :][0] == 1) if len(a_i_idx) != 1 or len(a_c_idx) != 1: raise ValueError('Error in the incidence matrix!') q_c_opt[a_c_idx[0], :] = lambda_b[e] * matrices['q_c'][a_c_idx[0], :] flh_c_opt[a_c_idx[0], :] = lambda_b[e] * matrices['flh_consumer'][a_c_idx[0], :] # Remove nonzero elements row-wise q_c_opt = q_c_opt[q_c_opt.any(axis=1)] flh_c_opt = flh_c_opt[flh_c_opt.any(axis=1)] # Postprocessing producers if np.shape(matrices['a_p'])[1] == 1: p_source_inst_opt = p_source_inst p_source_opt = p_source flh_s_opt = matrices['flh_source'] else: p_source_inst_opt = p_source_inst[p_source_inst != 0] p_source_opt = p_source[p_source_inst != 0, :] flh_s_opt = matrices['flh_source'][p_source_inst != 0, :] # Adaption of Incidence Matrix for further postprocessing for q in model.set_n_i: # if not active, all is 0 if lambda_b[q] == 0: matrices['a_i'][:, q] = 0 matrices['l_i'][q] = 0 # if opposite direction operational, switch a_i with -1 and switch # values lambda_ij for ji. This is necessary for the postprocessing. elif (lambda_b[q] == 1) & (lambda_ji[q, 0] == 1): matrices['a_i'][:, q] = matrices['a_i'][:, q] * (-1) lambda_ij[q, np.where(lambda_ij[q, 1:] == 0)[0]] = 1 lambda_ji[q, np.where(lambda_ji[q, 1:] == 1)[0]] = 0 p_lin = p_cap # Capacity of the pipes # drop entries with 0 in the incidence matrix to reduce size valid_columns = matrices['a_i'].any(axis=0) valid_rows = matrices['a_i'].any(axis=1) p_lin_opt = p_lin[valid_columns] p_ij_opt = p_ij[valid_columns, :] p_ji_opt = p_ji[valid_columns, :] lambda_ij_opt = lambda_ij[valid_columns, :] lambda_ji_opt = lambda_ji[valid_columns, :] pos_opt = matrices['position'][valid_rows, :] a_c_opt = matrices['a_c'][valid_rows, :] a_c_opt = a_c_opt[:, a_c_opt.any(axis=0)] a_p_opt = matrices['a_p'][valid_rows, :] a_i_opt = matrices['a_i'][valid_rows, :][:, valid_columns] l_i_opt = matrices['l_i'][valid_columns] m_lin, d_lin, v_lin = calculate_hydraulics(p_lin_opt, settings) res = { 'a_i': a_i_opt, 'a_p': a_p_opt, 'a_c': a_c_opt, 'q_c': q_c_opt, 'l_i': l_i_opt, 'lambda_b_orig': lambda_b, 'lambda_ij_opt': lambda_ij_opt, 'lambda_ji_opt': lambda_ji_opt, 'd_i_0': d_lin, 'm_i_0': m_lin, 'position': pos_opt, 'p': p_lin_opt, 'p_ij': p_ij_opt, 'p_ji': p_ji_opt, 'flh_c_opt': flh_c_opt, 'flh_s_opt': flh_s_opt, 'p_s_inst_opt': p_source_inst_opt, 'p_s_opt': p_source_opt } return res
[docs] def to_networkx_graph(matrices: dict) -> nx.DiGraph: """ Convert incidence matrices to a directed NetworkX graph. Includes the nodes and edges of the district, their length, and, if available, installed diameter, and power. Parameters ---------- matrices : dict Dictionary containing the incidence matrices as output by ``topotherm.postprocessing.sts`` or as input to the model. Returns ------- nx.DiGraph NetworkX directed graph. """ G = nx.DiGraph() # Add the nodes to the graph sums = matrices['a_c'].sum(axis=1) prod = matrices['a_p'].T.sum(axis=0) ges = (sums + prod).flatten() ges = (sums + prod).flatten() positions = matrices['position'] # Avoid redundant indexing colors = np.where(ges == 1, 'Red', np.where(ges == 0, 'Green', np.where(ges <= -1, 'Orange', None))) types = np.where(ges == 1, 'consumer', np.where(ges == 0, 'internal', np.where(ges <= -1, 'source', None))) for q in range(len(ges)): if colors[q]: # Skip nodes that don't match any category G.add_node(q, color=colors[q], type_=types[q], x=positions[q, 0], y=positions[q, 1]) sources = np.argmax(matrices['a_i'] == 1, axis=0) # First occurrence of 1 in each column targets = np.argmax(matrices['a_i'] == -1, axis=0) # First occurrence of -1 in each column # Get existing to allow solved and unsolved incidence matrices _data = ['l_i', 'd_i_0', 'p'] available_data = [k for k in _data if k in matrices.keys()] available_matrices = [matrices[k].flatten() for k in available_data] edges = np.column_stack([sources, targets] + available_matrices) # hacky solution, should maybe work dire for row in edges: u, v = row[0].astype(int), row[1].astype(int) data = {} idx = 2 # start after u, v for key in available_data: val = row[idx] idx += 1 if key == 'l_i': data['l'] = val elif key == 'd_i_0': data['d'] = val elif key == 'p': data['p'] = val if val == 0: # Skip edges with p == 0 break else: # Only add edge if loop didn’t break (i.e., p != 0 or p not present) G.add_edge(u, v, **data) return G
[docs] def to_dataframe(matrices_optimal: dict, matrices_init: dict) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Export the postprocessed, optimal district as pandas DataFrames. Includes the nodes and edges of the district, their length, installed diameter, and power. Parameters ---------- matrices_optimal : dict Solved and cleaned matrices as output by ``topotherm.postprocessing.sts``. matrices_init : dict Original matrices as output by ``topotherm.fileio.load``. Returns ------- nodes : pd.DataFrame DataFrame of nodes. edges : pd.DataFrame DataFrame of edges. """ # Create a DataFrame for the nodes nodes = pd.DataFrame( index=range(matrices_optimal['a_i'].shape[0]), columns=['type_', 'x', 'y', 'demand', 'total_installed_power'], data=None ) # Calculate the sum of matrices sum_ac = matrices_optimal['a_c'].sum(axis=1) sum_ap = matrices_optimal['a_p'].T.sum(axis=0) total_sum = np.array(sum_ac + sum_ap).flatten() # Assign types based on the sum nodes['type_'] = ['consumer' if x == 1 else 'internal' if x == 0 else 'source' for x in total_sum] nodes['x'] = matrices_optimal['position'][:, 0] nodes['y'] = matrices_optimal['position'][:, 1] # TODO: inherit in some way the consumer and producer id so that you don't have to search for it # in the original matrices, reducing the need to import them. sources_nodes = nodes.type_ == 'source' positions_sources = nodes.loc[sources_nodes, ['x', 'y']].values matches = np.all(matrices_init['position'][:, None, :] == positions_sources[None, :, :], axis=2) original_source_nodes = np.where(matches)[0] original_source_prods = np.where(matrices_init['a_p'][original_source_nodes, :] == -1)[1] # assume that we want to write out the total sum of installed power for each source # inherent limitation of the dataframe structure, only one dimensional data possible nodes.loc[sources_nodes, 'total_installed_power'] = matrices_optimal['p_s_inst_opt'][original_source_prods].sum() consumer_nodes = nodes[nodes.type_ == 'consumer'].index positions_consumers = nodes.loc[consumer_nodes, ['x', 'y']].values matches = np.all(matrices_init['position'][:, None, :] == positions_consumers[None, :, :], axis=2) original_consumer_nodes = np.where(matches)[0] original_consumer_edges = np.where( matrices_init['a_c'][original_consumer_nodes, :] == 1)[1] nodes.loc[consumer_nodes, 'demand'] = ( matrices_init['q_c']*matrices_init['flh_consumer'] )[original_consumer_edges].sum(axis=1).squeeze() nodes.loc[consumer_nodes, 'total_installed_power'] = ( matrices_init['q_c'])[original_consumer_edges].max(axis=1).squeeze() if abs(nodes.loc[consumer_nodes, 'total_installed_power'].sum() - matrices_optimal['q_c'].max(axis=1).sum()) > 1e-3: raise ValueError(f'Error in the incidence matrix! Demand {nodes.demand.sum()} != total demand {matrices_optimal["q_c"].sum()}') # Create a DataFrame for the edges edges = pd.DataFrame() edges['start_node'] = np.argmax(matrices_optimal['a_i'] == 1, axis=0) edges['end_node'] = np.argmax(matrices_optimal['a_i'] == -1, axis=0) edges['x_start'] = matrices_optimal['position'][edges['start_node'], 0] edges['y_start'] = matrices_optimal['position'][edges['start_node'], 1] edges['x_end'] = matrices_optimal['position'][edges['end_node'], 0] edges['y_end'] = matrices_optimal['position'][edges['end_node'], 1] edges['length'] = matrices_optimal['l_i'] edges['diameter'] = matrices_optimal['d_i_0'] edges['power'] = matrices_optimal['p'] edges.loc[:, 'to_consumer'] = edges['end_node'].map(nodes['type_']).eq('consumer') edges.loc[:, 'from_consumer'] = edges['start_node'].map(nodes['type_']).eq('consumer') if edges.to_consumer.sum() + edges.from_consumer.sum() != len(matrices_optimal['q_c']): raise ValueError(f'Error in the incidence matrix! To consumer {edges.to_consumer.sum()} + from_consumer {edges.from_consumer.sum()} != total consumers {len(matrices_optimal["q_c"])}') if edges.from_consumer.sum() > 0: raise ValueError(f'Error in the incidence matrix! From consumer {edges.from_consumer.sum()} is not 0 for single time steps') return nodes, edges