Zitat von DirtyHarry
Beitrag anzeigen
Gruß
lopiuh
: Wie soll denn dieses Flag interpretiert werden?
(ist nicht ironisch gemeint, ist wirklich anspruchsvoll und macht Spaß). Ich hab das mit Standardgeräten nicht geschafft und u.A. deswegen das OpenKNX-Logikmodul gemacht
.# -*- coding: utf-8 -*-
import pandas as pd
def read_csv_and_build_graph(csv_file):
"""
Liest die CSV-Datei ein und erstellt einen gerichteten Graphen.
Da die Matrix symmetrisch ist, wird für jedes Geräte-Paar (i, j) mit i < j sowohl
die obere rechte als auch die untere linke Hälfte ausgewertet:
- Falls df[i,j] == 1 oder df[j,i] == -1, dann muss das Gerät in Zeile i vor
dem Gerät in Spalte j starten.
- Falls df[i,j] == -1 oder df[j,i] == 1, dann muss das Gerät in Zeile j vor
dem Gerät in Spalte i starten.
Werte 0, 2, -2, 99 und -99 werden ignoriert.
"""
df = pd.read_csv(csv_file, index_col=0)
devices = list(df.index)
n = len(devices)
# Graph: Jeder Knoten (Gerät) hat eine Liste von direkten Nachfolgern.
graph = {device: [] for device in devices}
# In-Degree: Anzahl der eingehenden Kanten (harte Abhängigkeiten)
in_degree = {device: 0 for device in devices}
for i in range(n):
for j in range(i+1, n):
try:
value_upper = int(df.iloc[i, j])
except Exception:
value_upper = None
try:
value_lower = int(df.iloc[j, i])
except Exception:
value_lower = None
dependency = None
# Prüfe, ob (i, j) die Abhängigkeit signalisiert: Gerät[i] vor Gerät[j]
if (value_upper == 1) or (value_lower == -1):
dependency = (devices[i], devices[j])
# Alternativ: Prüfe, ob (i, j) das umgekehrte Signal liefert: Gerät[j] vor Gerät[i]
elif (value_upper == -1) or (value_lower == 1):
dependency = (devices[j], devices[i])
if dependency:
src, tgt = dependency
if tgt not in graph[src]:
graph[src].append(tgt)
in_degree[tgt] += 1
return graph, in_degree
def compute_start_groups(graph, in_degree):
"""
Ermittelt die Startgruppen mittels topologischer Sortierung.
Alle Geräte ohne eingehende Kanten (In-Degree == 0) werden gruppiert.
Falls zyklische Abhängigkeiten vorliegen, bleiben einige Geräte übrig.
Rückgabe:
- groups: Liste von Gruppen (Listen von Geräten), die ohne Zyklus ermittelt wurden.
- remaining_nodes: Menge der Geräte, die in Zyklen liegen.
"""
groups = []
nodes_remaining = set(graph.keys())
while nodes_remaining:
zero_in_degree = [node for node in nodes_remaining if in_degree[node] == 0]
if not zero_in_degree:
break # Es gibt keine Knoten mit In-Degree 0 → Zyklische Abhängigkeiten
groups.append(zero_in_degree)
for node in zero_in_degree:
nodes_remaining.remove(node)
for child in graph[node]:
in_degree[child] -= 1
return groups, nodes_remaining
def tarjan_scc(graph, nodes):
"""
Implementiert den Tarjan-Algorithmus zur Ermittlung stark zusammenhängender
Komponenten (SCC) im Subgraphen, der nur die Knoten aus 'nodes' enthält.
Rückgabe: Eine Liste von SCCs, wobei jede SCC eine Liste von Geräten darstellt.
"""
index = 0
indices = {}
lowlink = {}
stack = []
on_stack = set()
sccs = []
def strongconnect(v):
nonlocal index
indices[v] = index
lowlink[v] = index
index += 1
stack.append(v)
on_stack.add(v)
for w in graph[v]:
if w not in nodes:
continue
if w not in indices:
strongconnect(w)
lowlink[v] = min(lowlink[v], lowlink[w])
elif w in on_stack:
lowlink[v] = min(lowlink[v], indices[w])
if lowlink[v] == indices[v]:
scc = []
while True:
w = stack.pop()
on_stack.remove(w)
scc.append(w)
if w == v:
break
sccs.append(scc)
for v in nodes:
if v not in indices:
strongconnect(v)
return sccs
def main():
csv_file = "Geräteanlaufzeiten_chart_neu2.csv" # CSV-Datei mit der Matrix
graph, in_degree = read_csv_and_build_graph(csv_file)
groups, remaining_nodes = compute_start_groups(graph, in_degree)
# Erstelle eine Zuordnung: Gerät (gekürzte ID – nur die erste Zeile) -> Gruppe
device_to_group = {}
for group_num, group in enumerate(groups, start=1):
for device in group:
id_only = device.splitlines()[0]
device_to_group[id_only] = f"Gruppe {group_num}"
# Falls Geräte übrig bleiben, liegen zyklische Abhängigkeiten vor.
cycle_details = []
if remaining_nodes:
sccs = tarjan_scc(graph, remaining_nodes)
cycle_index = 1
for scc in sccs:
# Eine SCC wird als Zyklus gewertet, wenn mehr als ein Gerät enthalten ist
# oder ein einzelnes Gerät einen Self-Loop hat.
is_cycle = (len(scc) > 1) or (len(scc) == 1 and scc[0] in graph[scc[0]])
if is_cycle:
label = f"Cycle {cycle_index}"
cycle_index += 1
cycle_ids = [n.splitlines()[0] for n in scc]
cycle_details.append((label, cycle_ids))
for device in scc:
id_only = device.splitlines()[0]
device_to_group[id_only] = label
# Ausgabe der Geräte mit zugeordneter Gruppe
print("Gerät und zugeordnete Gruppe:")
print("{:<15}{}".format("ID", "Gruppe"))
for device, group in device_to_group.items():
print("{:<15}{}".format(device, group))
# Ausgabe der Konfliktgruppen (Zyklen), falls vorhanden
if cycle_details:
print("\nKonflikt: Geräte in zyklischen Abhängigkeiten:")
for label, devices in cycle_details:
print(f"{label}: {', '.join(devices)}")
else:
print("\nKeine zyklischen Abhängigkeiten gefunden.")
if __name__ == "__main__":
main()
Wir verarbeiten personenbezogene Daten über die Nutzer unserer Website mithilfe von Cookies und anderen Technologien, um unsere Dienste bereitzustellen. Weitere Informationen findest Du in unserer Datenschutzerklärung.
Indem Du unten auf "ICH stimme zu" klickst, stimmst Du unserer Datenschutzerklärung und unseren persönlichen Datenverarbeitungs- und Cookie-Praktiken zu, wie darin beschrieben. Du erkennst außerdem an, dass dieses Forum möglicherweise außerhalb Deines Landes gehostet wird und bist damit einverstanden, dass Deine Daten in dem Land, in dem dieses Forum gehostet wird, gesammelt, gespeichert und verarbeitet werden.


Kommentar