import numpy as np
import awkward as ak
import matplotlib.pyplot as plt
import scipy.signal as signal
from rich import print as rprint
from rich.progress import track
from scipy.ndimage import gaussian_filter1d
[docs]def merge_processed_files(file_list, data:str="DC", polarity=-1, width:int=3, threshold:float=0.0, height:float=0.001, header:int=3, segments:int=50, debug:bool=False):
ADCs = []
TIMEs = []
for file_path in track(file_list, description="Processing WVFs"):
times, this_ADCs, _, _ = process_file(file_path, data=data, polarity=polarity, width=width, threshold=threshold, height=height, header=header, segments=segments, debug=debug)
ADCs.append(this_ADCs[0])
TIMEs.append(times)
return np.asarray(ADCs), np.asarray(TIMEs)
[docs]def process_file(file_path:str, data:str="DC", polarity:int=1, width:int=3, threshold:float=0.0, height:float=0.001, distance:int=20, header:int=3, segments:int=50, debug:bool = False):
event_times, ADCs, period = read_file(file_path, data=data, header=header, segments=segments, debug=debug)
ADCs = np.asarray(ADCs)
ADCs=ADCs*polarity
ped_lim = get_ped_lim(ADCs, buffer=50)
ADCs = (ADCs.T - np.mean(ADCs[:, :ped_lim], axis=1).T).T
peaks, peak_values = peak_finder(ADCs, period=period, width=width, threshold=threshold, height=height, distance=distance, debug=debug)
return event_times, ADCs, peaks, peak_values
[docs]def get_ped_lim(ADCs, buffer:int=50, debug:bool = False):
peak = np.argmax(ADCs[:,:],axis=1)
values,counts = np.unique(peak, return_counts=True)
ped_lim = values[np.argmax(counts)]-buffer
if ped_lim <= buffer: ped_lim = buffer
return ped_lim
[docs]def get_times(file_path, header:int=3, segments:int=50, debug:bool=False):
decimal_numbers = []
with open(file_path, 'r') as file:
lines = file.readlines()[header:segments+header]
for line in lines:
words = line.split(',')
last_word = words[-1]
decimal_numbers.append(float(last_word))
windows_times = np.array(decimal_numbers)
return np.array(windows_times)
[docs]def get_DC_ADCs(file_path, header:int=3, segments:int=50, debug:bool=False):
ADCs=np.loadtxt(file_path, delimiter=',', skiprows=54)
period=ADCs[1,0]-ADCs[0,0]
ADCs=ADCs[:,1]
seg_len=int(ADCs.shape[0]/segments)
ADCs=ADCs.reshape((segments,seg_len))
return ADCs,period
[docs]def get_SPE_ADCs(file_path, header:int=5, segments:int=0, debug:bool=False):
ADCs=np.loadtxt(file_path, delimiter=',', skiprows=header)
period=ADCs[1,0]-ADCs[0,0]
ADCs=ADCs[:,1]
return [ADCs],period
[docs]def read_file(file_path, data:str="DC", header:int=3, segments:int=50, debug:bool=False):
event_times=get_times(file_path, header=header, segments=segments, debug=debug)
if data=="DC":
ADCs,period=get_DC_ADCs(file_path, header=header, segments=segments, debug=debug)
if data=="SPE":
ADCs,period=get_SPE_ADCs(file_path, header=header, segments=segments, debug=debug)
# if debug: rprint(f"Read {ADCs.shape[0]} segments of {ADCs.shape[1]} samples each")
return event_times, ADCs, period
[docs]def peak_finder(ADCs, period, width=3, height:float=0.001, threshold:float=0.0, distance:int=20, debug:bool=False):
peaks = []
peak_values = [] # New list to store ADC values at each peak
for row in ADCs:
peak_indices, _ = signal.find_peaks(row, width=width, height=height, threshold=threshold, distance=distance)
peaks.append(peak_indices*period)
peak_values.append(row[peak_indices]) # Store ADC values at each peak
return peaks, peak_values
[docs]def smooth_ADCs(ADCs, debug:bool=False):
# TODO: Implement a better smoothing algorithm, so that the peak amplitude is not affected!
smoothed_ADCs = gaussian_filter1d(ADCs, sigma=4, mode='reflect', axis=1)
return smoothed_ADCs