Zitat von DirtyHarry
Beitrag anzeigen
Gruß
lopiuh
# -*- coding: utf-8 -*- import pandas as pd def read_csv_and_build_graph(csv_file): """ Liest die CSV-Datei ein und erstellt einen gerichteten Graphen. Da die Matrix symmetrisch ist, wird für jedes Geräte-Paar (i, j) mit i < j sowohl die obere rechte als auch die untere linke Hälfte ausgewertet: - Falls df[i,j] == 1 oder df[j,i] == -1, dann muss das Gerät in Zeile i vor dem Gerät in Spalte j starten. - Falls df[i,j] == -1 oder df[j,i] == 1, dann muss das Gerät in Zeile j vor dem Gerät in Spalte i starten. Werte 0, 2, -2, 99 und -99 werden ignoriert. """ df = pd.read_csv(csv_file, index_col=0) devices = list(df.index) n = len(devices) # Graph: Jeder Knoten (Gerät) hat eine Liste von direkten Nachfolgern. graph = {device: [] for device in devices} # In-Degree: Anzahl der eingehenden Kanten (harte Abhängigkeiten) in_degree = {device: 0 for device in devices} for i in range(n): for j in range(i+1, n): try: value_upper = int(df.iloc[i, j]) except Exception: value_upper = None try: value_lower = int(df.iloc[j, i]) except Exception: value_lower = None dependency = None # Prüfe, ob (i, j) die Abhängigkeit signalisiert: Gerät[i] vor Gerät[j] if (value_upper == 1) or (value_lower == -1): dependency = (devices[i], devices[j]) # Alternativ: Prüfe, ob (i, j) das umgekehrte Signal liefert: Gerät[j] vor Gerät[i] elif (value_upper == -1) or (value_lower == 1): dependency = (devices[j], devices[i]) if dependency: src, tgt = dependency if tgt not in graph[src]: graph[src].append(tgt) in_degree[tgt] += 1 return graph, in_degree def compute_start_groups(graph, in_degree): """ Ermittelt die Startgruppen mittels topologischer Sortierung. Alle Geräte ohne eingehende Kanten (In-Degree == 0) werden gruppiert. Falls zyklische Abhängigkeiten vorliegen, bleiben einige Geräte übrig. Rückgabe: - groups: Liste von Gruppen (Listen von Geräten), die ohne Zyklus ermittelt wurden. - remaining_nodes: Menge der Geräte, die in Zyklen liegen. """ groups = [] nodes_remaining = set(graph.keys()) while nodes_remaining: zero_in_degree = [node for node in nodes_remaining if in_degree[node] == 0] if not zero_in_degree: break # Es gibt keine Knoten mit In-Degree 0 → Zyklische Abhängigkeiten groups.append(zero_in_degree) for node in zero_in_degree: nodes_remaining.remove(node) for child in graph[node]: in_degree[child] -= 1 return groups, nodes_remaining def tarjan_scc(graph, nodes): """ Implementiert den Tarjan-Algorithmus zur Ermittlung stark zusammenhängender Komponenten (SCC) im Subgraphen, der nur die Knoten aus 'nodes' enthält. Rückgabe: Eine Liste von SCCs, wobei jede SCC eine Liste von Geräten darstellt. """ index = 0 indices = {} lowlink = {} stack = [] on_stack = set() sccs = [] def strongconnect(v): nonlocal index indices[v] = index lowlink[v] = index index += 1 stack.append(v) on_stack.add(v) for w in graph[v]: if w not in nodes: continue if w not in indices: strongconnect(w) lowlink[v] = min(lowlink[v], lowlink[w]) elif w in on_stack: lowlink[v] = min(lowlink[v], indices[w]) if lowlink[v] == indices[v]: scc = [] while True: w = stack.pop() on_stack.remove(w) scc.append(w) if w == v: break sccs.append(scc) for v in nodes: if v not in indices: strongconnect(v) return sccs def main(): csv_file = "Geräteanlaufzeiten_chart_neu2.csv" # CSV-Datei mit der Matrix graph, in_degree = read_csv_and_build_graph(csv_file) groups, remaining_nodes = compute_start_groups(graph, in_degree) # Erstelle eine Zuordnung: Gerät (gekürzte ID – nur die erste Zeile) -> Gruppe device_to_group = {} for group_num, group in enumerate(groups, start=1): for device in group: id_only = device.splitlines()[0] device_to_group[id_only] = f"Gruppe {group_num}" # Falls Geräte übrig bleiben, liegen zyklische Abhängigkeiten vor. cycle_details = [] if remaining_nodes: sccs = tarjan_scc(graph, remaining_nodes) cycle_index = 1 for scc in sccs: # Eine SCC wird als Zyklus gewertet, wenn mehr als ein Gerät enthalten ist # oder ein einzelnes Gerät einen Self-Loop hat. is_cycle = (len(scc) > 1) or (len(scc) == 1 and scc[0] in graph[scc[0]]) if is_cycle: label = f"Cycle {cycle_index}" cycle_index += 1 cycle_ids = [n.splitlines()[0] for n in scc] cycle_details.append((label, cycle_ids)) for device in scc: id_only = device.splitlines()[0] device_to_group[id_only] = label # Ausgabe der Geräte mit zugeordneter Gruppe print("Gerät und zugeordnete Gruppe:") print("{:<15}{}".format("ID", "Gruppe")) for device, group in device_to_group.items(): print("{:<15}{}".format(device, group)) # Ausgabe der Konfliktgruppen (Zyklen), falls vorhanden if cycle_details: print("\nKonflikt: Geräte in zyklischen Abhängigkeiten:") for label, devices in cycle_details: print(f"{label}: {', '.join(devices)}") else: print("\nKeine zyklischen Abhängigkeiten gefunden.") if __name__ == "__main__": main()
Wir verarbeiten personenbezogene Daten über die Nutzer unserer Website mithilfe von Cookies und anderen Technologien, um unsere Dienste bereitzustellen. Weitere Informationen findest Du in unserer Datenschutzerklärung.
Indem Du unten auf "ICH stimme zu" klickst, stimmst Du unserer Datenschutzerklärung und unseren persönlichen Datenverarbeitungs- und Cookie-Praktiken zu, wie darin beschrieben. Du erkennst außerdem an, dass dieses Forum möglicherweise außerhalb Deines Landes gehostet wird und bist damit einverstanden, dass Deine Daten in dem Land, in dem dieses Forum gehostet wird, gesammelt, gespeichert und verarbeitet werden.
Kommentar