import os
import stat
import yaml
import pandas as pd
import plotly.express as px
from itertools import product
from rich import print as rprint
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from src.utils import get_project_root
root = get_project_root()
[docs]def classify_df(df, method="SILHOUETTE", ow=False, debug=False):
# Find the peaks of the Amplitude distribution for each set, OV and Channel
numbers = df['Number'].unique()
labels = df['Set'].unique()
ovs = df['OV'].unique()
chs = df['Channel'].unique()
peaks = generate_yaml(df, 'peak', ow=ow, debug=debug)
score = generate_yaml(df, 'score', ow=ow, debug=debug)
# Check if the Cluster column exists
if 'Cluster' not in df.columns:
df['Cluster'] = None
# Iterate over the combinations of Number, Set, OV and Channel
for n, label, ov, ch in product(numbers, labels, ovs, chs):
data = df[(df['Number'] == n) & (df['Set'] == label) & (df['OV'] == ov) & (df['Channel'] == ch)]
# Skip if there are no data
if len(data) == 0:
if debug: rprint(f"No data for {n} {label} {ov} {ch}")
continue
# Check if the peak has already been found
if peaks[int(n)][str(label)][int(ov)][int(ch)] is not None and ow is False:
if debug: rprint(f"Peak for {n} {label} {ov} {ch} already found")
continue
# Find the peaks of the Amplitude distribution by finding the cluster centers but iterate over the number of clusters to find the best number of clusters
for i in range(2,8):
kmeans = KMeans(n_clusters=i, random_state=0).fit(data['Amplitude'].values.reshape(-1,1))
prev_score = -1
if method == "SILHOUETTE":
this_score = silhouette_score(data['Amplitude'].values.reshape(-1,1), kmeans.labels_)
if debug: rprint(f"Calculated {i} cluster silhouette score for {n} {label} {ov} {ch}: {this_score}")
elif method == "ELBOW":
this_score = kmeans.inertia_
if debug: rprint(f"Calculated {i} cluster inertia for {n} {label} {ov} {ch}: {this_score}")
elif method == "SSE":
this_score = kmeans.score(data['Amplitude'].values.reshape(-1,1))
if debug: rprint(f"Calculated {i} cluster score for {n} {label} {ov} {ch}: {this_score}")
else:
rprint(f"Method {method} not implemented")
return
if this_score > prev_score:
score[int(n)][str(label)][int(ov)][int(ch)] = float(this_score)
peaks[int(n)][str(label)][int(ov)][int(ch)] = kmeans.cluster_centers_.tolist()
rprint(f'-> Updating {method} score to {score[int(n)][str(label)][int(ov)][int(ch)]} and peak to {peaks[int(n)][str(label)][int(ov)][int(ch)]}')
best_cluster = i
prev_score = this_score
if debug: rprint(f"Best number of clusters for {n} {label} {ov} {ch} is {best_cluster} with a score of {score[int(n)][str(label)][int(ov)][int(ch)]}")
df.loc[(df['Number'] == n) & (df['Set'] == label) & (df['OV'] == ov) & (df['Channel'] == ch), 'Cluster'] = kmeans.labels_
df.loc[:,'Cluster'] = df['Cluster'].astype(str)
save_yaml(peaks, 'peak', debug=debug)
save_yaml(score, 'score', debug=debug)
return df
[docs]def generate_yaml(df:pd.DataFrame, name:str, path:str=f'{root}/analysis/', ow:bool=False, debug:bool=False):
# Check if path exists
if not os.path.exists(path):
os.makedirs(path)
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
# First check if the dictionary already exists
try:
data = yaml.load(open(f'{path}{name}.yml'),Loader=yaml.FullLoader)
except FileNotFoundError:
data = {}
numbers = df['Number'].unique()
labels = df['Set'].unique()
ovs = df['OV'].unique()
chs = df['Channel'].unique()
# Only create the non existing branches of the dictionary
for n in numbers:
n = int(n)
if data.get(n, None) is None:
data[n] = {}
for label in labels:
label = str(label)
if data[n].get(label, None) is None:
data[n][label] = {}
for ov in ovs:
ov = int(ov)
if data[n][label].get(ov, None) is None:
data[n][label][ov] = {}
for ch in chs:
ch = int(ch)
if data[n][label][ov].get(ch, None) is None or ow is True:
data[n][label][ov][ch] = None
if debug: rprint(f'{name} dictionary updated with {numbers} {labels} {ovs} {chs}')
return data
[docs]def save_yaml(data:dict, name:str, path:str=f'{root}/analysis/', debug:bool=False) -> None:
# Save the data
with open(f'{path}{name}.yml', 'w') as file:
yaml.dump(data, file)
os.chmod(f'{path}{name}.yml', stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
if debug: rprint(f'{name} dictionary saved in {path}{name}.yml')
return