"""Calculate the simultaneity factor of a given graph of the optimal district heating network.
"""
import copy
import numpy as np
import networkx as nx
import pandas as pd
[docs]
def calculate(graph: nx.DiGraph) -> nx.DiGraph:
"""
Calculate the simultaneity factor of a given graph of the optimal district
heating network.
Args:
graph: A NetworkX graph object representing the network.
Returns:
nx.DiGraph: A graph containing the simultaneity factors of the graph
in the attribute 'simultaneity'.
"""
# Compute the Laplacian matrix for the graph
laplace = compute_laplacian(graph)
# Resolve loops in the graph and update the Laplacian matrix
G_copy, laplace = resolve_loops(graph, laplace)
# Resolve multiple connections between nodes and update the Laplacian matrix
laplace = resolve_multi_connections(G_copy, laplace)
# Identify and mark end consumers in the graph
graph = get_n_end_consumers(graph, laplace)
# Calculate the final simultaneity factor based on the processed graph
graph_simultaneity = calculate_simultaneity_factor(graph)
return graph_simultaneity
[docs]
def compute_laplacian(graph: nx.DiGraph) -> np.array:
"""
Compute the Laplacian matrix of a given graph.
Args:
graph: A NetworkX graph object of the solved network connections.
Returns:
pd.DataFrame: A pandas DataFrame representing the Laplacian matrix.
"""
# Calculate the out-degree as a diagonal matrix
out_degree = np.diag([graph.out_degree(node) for node in graph.nodes])
# Convert the adjacency matrix to a numpy array
adjacency_matrix = nx.to_numpy_array(graph, nodelist=graph.nodes)
# Compute the Laplacian matrix (D - A)
laplace = out_degree - adjacency_matrix
return laplace
[docs]
def resolve_loops(graph: nx.DiGraph,
laplace: pd.DataFrame) -> tuple[nx.DiGraph, np.array]:
"""
Resolve loops in the graph and update the Laplacian matrix accordingly.
Args:
graph: A NetworkX graph object.
laplace: The Laplacian matrix of the graph.
Returns:
A tuple containing the updated graph and Laplacian matrix.
"""
def find_nodes_by_attribute(graph: nx.DiGraph, attribute: str, value) -> list:
"""
Find nodes in a graph that have a specific attribute value.
Args:
graph: The NetworkX graph.
attribute: The attribute to search for.
value: The attribute value to match.
Returns:
list: A list of nodes with the specified attribute value.
"""
return [node for node, data in graph.nodes(data=True)
if data.get(attribute) == value]
# Find nodes labeled as 'Heat Source'
heat_source_nodes = find_nodes_by_attribute(graph, 'label', 'Heat Source')
G_copy = copy.deepcopy(graph)
# Identify and resolve simple cycles in the graph
for loop in nx.recursive_simple_cycles(graph):
laplace_updated = False
for node in heat_source_nodes:
if nx.has_path(graph, node, loop[0]) and nx.has_path(graph, node, loop[-1]):
if nx.shortest_path_length(graph, 1, loop[-1]) > nx.shortest_path_length(graph, 1, loop[0]):
# Update Laplacian matrix and remove edge from the graph
laplace[loop[-1], loop[0]] = 0
laplace[loop[-1], loop[-1]] -= 1
G_copy.remove_edge(loop[-1], loop[0])
else:
laplace[loop[0], loop[-1]] = 0
laplace[loop[0], loop[0]] -= 1
G_copy.remove_edge(loop[0], loop[-1])
laplace_updated = True
if laplace_updated:
break
return G_copy, laplace
[docs]
def resolve_multi_connections(G_copy: nx.DiGraph,
laplace: np.array) -> np.array:
"""
Resolves multiple incoming connections for nodes in a directed graph by propagating 'connections' attributes
and updating the Laplacian matrix accordingly.
Parameters:
G_copy: A copy of the directed graph from the previous step.
laplace: The Laplacian matrix to be updated.
Returns:
laplace: Updated Laplacian matrix with modified diagonal elements.
"""
# Initialize 'connections {node}' attributes for nodes with in-degree > 1
for node in G_copy.nodes:
if G_copy.in_degree(node) > 1:
G_copy.nodes[node][f'connections {node}'] = 1
# Function to check for 'connections' attributes in any node
def has_connection_attribute(graph):
return any(
"connections" in str(key).lower()
for _, attrs in graph.nodes(data=True)
for key in attrs.keys()
)
# Propagate 'connections' attribute
while has_connection_attribute(G_copy):
updates_occurred = False # Flag
print(G_copy.nodes.data())
# Process nodes in normal order
for node in G_copy.nodes:
connection_attrs = {
key: value for key, value in G_copy.nodes[node].items() if 'connections' in key
}
for key in connection_attrs.keys():
# Count successors with the same connections attribute
count_of_successors = sum(
G_copy.nodes[succ].get(key, 0) == G_copy.nodes[node].get(key, 0)
for succ in G_copy.successors(node)
)
# Update the node's connections attribute if the count is greater
current_value = G_copy.nodes[node].get(key, 0)
if count_of_successors > current_value:
G_copy.nodes[node][key] = count_of_successors
updates_occurred = True
# Update predecessors if they lack the attribute
for pred in G_copy.predecessors(node):
for key, value in connection_attrs.items():
if key not in G_copy.nodes[pred]:
G_copy.nodes[pred][key] = 1
updates_occurred = True
# Break the loop if no updates occurred
if not updates_occurred:
break
for node in G_copy.nodes:
connection_attrs = {
key: value for key, value in G_copy.nodes[node].items() if 'connections' in key
}
for key, value in connection_attrs.items():
if value > 1:
laplace[node, node] -= (value - 1)
return laplace
[docs]
def get_n_end_consumers(graph: nx.DiGraph, laplace:np.array) -> nx.DiGraph:
"""
Identify and mark end consumers in the graph based on the Laplacian matrix.
Args:
graph: A NetworkX graph object.
laplace: The Laplacian matrix of the graph.
Returns:
The graph with 'n_consumers' attributes added to nodes.
"""
laplace = pd.DataFrame(laplace, index=graph.nodes, columns=graph.nodes)
while not laplace.empty:
for i in laplace.index.tolist():
laplace_single_row = laplace.loc[i]
connected_nodes_no = laplace_single_row[i]
laplace_connected_nodes = laplace_single_row.drop(i)
is_isolated = not (laplace_connected_nodes != 0).any()
if is_isolated:
graph.nodes[i]['n_consumers'] = connected_nodes_no
if connected_nodes_no != 0:
laplace_single_column = laplace[i]
for j in laplace_single_column.index.tolist():
if laplace_single_column[j] != 0:
laplace.loc[j, j] += connected_nodes_no - 1
laplace.drop(index=i, columns=i, inplace=True)
return graph
[docs]
def calculate_simultaneity_factor(graph: nx.DiGraph) -> nx.DiGraph:
"""
Calculate the simultaneity factor for edges in the graph.
Args:
graph: A NetworkX graph object with number of end consumers marked as
n_consumers attributes.
Returns:
nx.DiGraph: A graph that has edge names and values for the
simultaneity factor
"""
for u, v in graph.edges():
end_consumers_u = graph.nodes[u].get('n_consumers', 0)
end_consumers_v = graph.nodes[v].get('n_consumers', 0)
max_end_consumers = max(end_consumers_u, end_consumers_v)
# Calculate the simultaneity factor based on the maximum number of end consumers
simultaneity_factor = (
0.449677646267461
+ (0.551234688
/ (1 + pow((max_end_consumers / 53.84382392), 1.762743268))
)
)
graph.edges[u, v]['simultaneity'] = min(simultaneity_factor, 1)
return graph
[docs]
def update_data(
df_nodes: pd.DataFrame,
df_edges: pd.DataFrame,
network: nx.DiGraph
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Update the edge and node data with simultaneity attributes.
Args:
node_data: A DataFrame containing edge data.
edge_data: A DataFrame containing node data.
network_simultaneity: A NetworkX graph object with simultaneity factors,
needs to contain power, simultaneity attributes
Returns:
tuple[pd.DataFrame, pd.DataFrame]: Updated edge and node data DataFrames.
"""
# p_simultaneity = power * simultaneity to the network_simultaneity object
for u, v, d in network.edges(data=True):
network[u][v]['power_simultaneity'] = d['simultaneity'] * d['p']
# get simultaneity attributes from networkx object, add to edge_data
df_edges_sim = pd.DataFrame(
[(u, v, d['simultaneity'], d['power_simultaneity'])
for u, v, d in network.edges(data=True)],
columns=['start_node', 'end_node', 'simultaneity', 'power_simultaneity']
).set_index(['start_node', 'end_node'])
df_edges = df_edges.join(df_edges_sim, on=['start_node', 'end_node'])
n_consumers_dict = {
n: d.get('n_consumers', None)
for n, d in network.nodes(data=True)}
df_nodes['n_consumers'] = pd.Series(n_consumers_dict)
return df_nodes, df_edges