Source code for analysis_modules.data_module.abci_data
import json
import math
import os
import re
import shutil
import time
import matplotlib
import pandas as pd
from PyQt5.QtWidgets import QMessageBox
from utils.file_reader import FileReader
import matplotlib.pyplot as plt
import matplotlib as mpl
# mpl.rcParams['text.usetex'] = True
# mpl.rcParams['text.latex.preamble'] = [r'\usepackage{amsmath}'] # for \text command
# from labellines import labelLine, labelLines
import multiprocessing as mp
import scipy.signal as sps
import numpy as np
from termcolor import colored
fr = FileReader()
file_color = 'cyan'
# DEBUG = False
DEBUG = True
[docs]class ABCIData:
def __init__(self, dirc, fid, MROT):
self.title_dict = {}
self.data_dict = {}
self.dir = dirc
self.fid = fid
self.MROT = MROT
self.loss_factor = {}
self.SIG = None
self.wakelength = None
self.checks(dirc, fid, MROT)
[docs] def checks(self, dirc, fid, MROT):
dirc = fr'{dirc}\{fid}\Cavity_MROT_{MROT}.top'
# print(dirc)
if os.path.exists(dirc):
self._get_plot_data(dirc)
else:
print_("Hey chief, there seems to be a problem with the ABCI file directory. Please check.")
def _get_plot_data(self, dirc):
frame_objects = {}
frame_titles_objects = {}
frame_count = 0
plot_decorations = [r'Cavity Shape Input', r'Cavity Shape Used', r'Wake Potentials',
r'Real Part of Longitudinal Impedance', r'Imaginary Part of Longitudinal Impedance',
'Frequency Spectrum of Loss Factor', r'Loss Factor Spectrum Integrated upto F',
r'Real Part of Long. + Log Impedance', r'Imaginary Part of Long. + Log Impedance',
r'Spectrum of Long. + Log Loss Factor', r'Long. + Log Factor Integrated up to F',
r'Real Part of Azimuthal Impedance',
r'Imaginary Part of Azimuthal Impedance', r'Real Part of Transverse Impedance',
r'Imaginary Part of Transverse Impedance']
with open(dirc, 'r') as f:
for line in f:
if 'NEW FRAME' in line:
# add frame to frame_objects
if frame_count > 0:
frame_titles_objects[frame_count - 1] = frame_title
# select suitable title
for decor in plot_decorations:
if decor in frame_title[0] + frame_title[1]:
frame_objects[decor] = line_objects
break
# reset lists
line_objects = []
frame_title = []
new_frame = True
frame_count += 1
if new_frame:
new_frame = False
# add titles to frame_title
if 'TITLE' in line or 'MORE' in line:
frame_title.append(line)
# print(line)
# if 'Transverse Wake' in line:
# key = 'Transverse'
# print('True')
# get other parameters from title
# try:
if 'Azimuthal Wake' in line:
key = 'Azimuthal'
if 'Transverse Wake' in line:
key = 'Transverse'
if 'Longitudinal Wake' in line:
key = 'Longitudinal'
if 'Loss Factor' in line and key:
indx_0 = line.index('= ')
indx_1 = line.index('V/pC')
loss_factor = float(line[indx_0 + 2:indx_1])
self.loss_factor[key] = loss_factor
key = None
# get sigma
if 'SIG' in line and self.SIG is None:
indx_0 = line.index('SIG= ')
indx_1 = line.index(' cm')
self.SIG = float(line[indx_0 + 5:indx_1])
# except:
# pass
if 'SET LIMITS' in line:
x, y = [], []
# get wakelength
if 'SET LIMITS X 0.00000E+00' in line:
indx_0 = line.index('SET LIMITS X 0.00000E+00')
indx_1 = line.index('Y')
self.wakelength = float(line[indx_0 + len('SET LIMITS X 0.00000E+00'):indx_1].strip())
if 'JOIN' in line:
new_line_object = True
else:
new_line_object = False
if new_line_object:
# add x and y to line objects
line_objects.append([x, y])
# reset x and y
x, y = [], []
if len(line.strip().split()) == 2 and line.strip().split()[0] != 'NEW' and line.strip().split()[
0] != 'JOIN':
new_line_object = False
ll = [float(a) for a in line.strip().split()]
x.append(ll[0])
y.append(ll[1])
# EOF append last list
frame_titles_objects[frame_count - 1] = frame_title
# select suitable title
for decor in plot_decorations:
if decor in frame_title[0] + frame_title[1]:
frame_objects[decor] = line_objects
break
self.data_dict = {}
for key, lines in frame_objects.items():
x, y = [], []
for line in lines:
if len(line[0]) > 2:
x += line[0]
y += line[1]
if key in [r'Cavity Shape Input', r'Cavity Shape Used']:
x = np.array(x)-np.amax(x)/2
# # insert values to complete shape
# x = np.insert(x, 0, x[0])
# y = np.insert(y, 0, 0)
# x = np.insert(x, -1, x[-1]).tolist()
# y = np.insert(y, -1, 0).tolist()
self.data_dict[key] = [x, y]
[docs] def get_data(self, key):
plot_decorations = [r'Cavity Shape Input', r'Cavity Shape Used', r'Wake Potentials',
r'Real Part of Longitudinal Impedance', r'Imaginary Part of Longitudinal Impedance',
'Frequency Spectrum of Loss Factor', r'Loss Factor Spectrum Integrated upto F',
r'Real Part of Long. + Log Impedance', r'Imaginary Part of Long. + Log Impedance',
r'Spectrum of Long. + Log Loss Factor', r'Long. + Log Factor Integrated up to F',
r'Real Part of Azimuthal Impedance',
r'Imaginary Part of Azimuthal Impedance', r'Real Part of Transverse Impedance',
r'Imaginary Part of Transverse Impedance']
if isinstance(key, int):
key = plot_decorations[key]
# print_(self.data_dict)
self.x = self.data_dict[key][0]
self.y = self.data_dict[key][1]
# Process all needed data
self.x_peaks, self.y_peaks = self.get_peaks(threshold=max(self.y) * 0.02)
# if key != 'Loss Factor Spectrum Integrated up to F':
# self.bands = self._get_bands()
# self.fl_list = self._interp_fl()
# self.fr_list = self._interp_fr()
# self.BW = self._get_BW()
# self.Q = self._get_Q()
# self.R_Q = self._get_RQ()
return self.x, self.y, self.fid
def _get_title(self, MROT):
if MROT == 0:
self.plot_decorations = {0: [r'Cavity Shape Input', r'Z-axis (m)', 'R-axis (m)'],
1: [r'Cavity Shape Used', r'Z-axis (m)', 'R-axis (m)'],
2: [r'Wake Potentials', r'Distance from Bunch Head S (m)', r'Scaled Wake Potential W (S)'],
3: [r'Real Part of Longitudinal Impedance', r'Frequency f (GHz)', r'Real $Z_L$ ($k\Omega$) '],
4: [r'Imaginary Part of Longitudinal Impedance', r'Frequency f (GHz)', r'Imag $Z_L$ ($k\Omega$) '],
5: [r'Frequency Spectrum of Loss Factor', 'Frequency f (GHz)', r'$dk_L/df$ (V/pC/GHz) '],
6: [r'Loss Factor Spectrum Integrated up to F', 'Frequency f (GHz)', r'$k_L$(F) (V/pC)'],
7: [r'Real Part of Long. + Log Impedance', 'Frequency f (GHz)', r'Real $Z_{L+LOG}$ ($k\Omega$)'],
8: [r'Imaginary Part of Long. + Log Impedance', 'Frequency f (GHz)', r'Imag $Z_{L+LOG}$ ($k\Omega$)'],
9: [r'Spectrum of Long. + Log Loss Factor', 'Frequency f (GHz)', r'$dk_L/df$ ($V/pC/GHz$)'],
10: [r'Long. + Log Factor Integrated up to F', 'Frequency f (GHz)', r'$k_L$(F) (V/pC)']
}
else:
self.plot_decorations = {0: [r'Cavity Shape Input', r'Z-axis (m)', r'R-axis (m)'],
1: [r'Cavity Shape Used', r'Z-axis (m)', r'R-axis (m)'],
2: [r'Wake Potentials', r'Distance from Bunch Head S (m)', r'Scaled Wake Potential W (S)'],
3: [r'Real Part of Azimuthal Impedance', r'Frequency f (GHz)', r'Real $Z_A$ ($k\Omega/m$) '],
4: [r'Imaginary Part of Azimuthal Impedance', r'Frequency f (GHz)', r'Imag $Z_A$ ($k\Omega/m$) '],
5: [r'Real Part of Transverse Impedance', r'Frequency f (GHz)', r'Real $Z_T$ ($k\Omega/m$) '],
6: [r'Imaginary Part of Transverse Impedance', r'Frequency f (GHz)', r'Imag $Z_T$ ($k\Omega/m$) '],
7: [r'Real Part of Longitudinal Impedance', r'Frequency f (GHz)', r'Real $Z_L$ ($k\Omega/m^2$) '],
8: [r'Imaginary Part of Longitudinal Impedance', r'Frequency f (GHz)', r'Imag $Z_L$ ($k\Omega/m^2$) ']}
return self.plot_decorations
def _nparray_to_list(self, x):
result = []
for array in x:
result.extend(array.tolist())
return result
[docs] def get_peaks(self, threshold):
# Develop later
peaks, _ = sps.find_peaks(self.y, height=threshold)
xp, yp = np.array(self.x)[peaks], np.array(self.y)[peaks]
return xp, yp
# self.plot_beam_pipe_cutoffs()
def _get_bands(self):
try:
# get band ranges
group_dict = {}
gr_n = 0
xl, yl = [], []
n = 0
for i, val in enumerate(self.x_peaks):
if i == 0:
xl.append(val)
yl.append(self.y_peaks[i])
else:
# if val - xl[n] < 0.2:
if val - xl[n] < 0.1:
xl.append(val)
yl.append(self.y_peaks[i])
n += 1
else:
# save group
group_dict[gr_n] = {'xb': xl, 'yb': yl}
# next group
gr_n += 1
# reset lists
xl, yl = [], []
n = 0
# append current result
xl.append(val)
yl.append(self.y[i])
group_dict[gr_n] = {'xb': xl, 'yb': yl} # last group
# print_(group_dict)
return group_dict
except Exception as e:
print_(f"Bands cannot be gotten without getting the peaks first. {e}")
def _interp_fl(self):
fl_list = []
wx = [0, 0]
wz = [1, 0]
for f0, z in zip(self.x_peaks, self.y_peaks):
z_0707 = 0.707 * z
# print_(z_0707)
l = 0
for i, h in enumerate(self.y):
if (l < z_0707 < h):
if (f0-0.01 < self.x[i] < f0):
wz = [l, h]
wx = [self.x[i - 1], self.x[i]]
break
l = h
# print_(f'\t{wx, wz}')
# interpolate
fl = wx[0] + (wx[1]-wx[0])*(z_0707-wz[0])/(wz[1]-wz[0])
fl_list.append(fl)
return fl_list
def _interp_fr(self):
fr_list = []
wx = [0, 0]
wz = [1, 0]
for f0, z in zip(self.x_peaks, self.y_peaks):
z_0707 = 0.707 * z
# print_(z_0707)
l = 0
for i, h in enumerate(self.y):
if l > z_0707 > h:
if (f0 < self.x[i] < f0 + 0.05):
wz = [l, h]
wx = [self.x[i - 1], self.x[i]]
break
l = h
# f0_old = f0
# print_(f'\t{wx, wz}')
# interpolate
fr = wx[0] + (wx[1]-wx[0])*(z_0707-wz[0])/(wz[1]-wz[0])
fr_list.append(fr)
return fr_list
def _get_Q(self):
Q = [f0/bw for f0, bw in zip(self.x_peaks, self.BW)]
return Q
def _get_RQ(self):
if self.MROT == 0:
R_Q = [2*zz*1e3/q for zz, q in zip(self.y_peaks, self.Q)]
# print_(f'R/Q: {R_Q}')
else:
c = 299792458 # m / s
k = [2*np.pi*f0*1e9/c for f0 in self.x_peaks]
R_Q_m = [2*zz*1e3/q for zz, q in zip(self.y_peaks, self.Q)]
R_Q = [r_q_m/kk for r_q_m, kk in zip(R_Q_m, k)]
# print_(f'R/Q: {R_Q}')
return R_Q
def _get_BW(self):
BW = [a - b for a, b in zip(self.fr_list, self.fl_list)]
return BW
[docs] def plot_bands(self):
fig, axs = plt.subplots(2, sharex=True)
fig.suptitle('Vertically stacked subplots')
axs[1].bar(self.x_peaks, self.R_Q, width=0.01, alpha=1)
axs[0].plot(self.x, self.y)
axs[0].scatter(self.x_peaks, self.y_peaks)
color = ['#008fd5', '#fc4f30', '#e5ae38', '#6d904f', '#8b8b8b', '#810f7c', '#ed718b', '#2ccf8b', '#fded21', '#8248d2', '#4c3a27', '#000000',
'#008fd5', '#fc4f30', '#e5ae38', '#6d904f', '#8b8b8b', '#810f7c', '#ed718b', '#2ccf8b', '#fded21', '#8248d2', '#4c3a27', '#000000']
for key, val in self.bands.items():
tol = 0.05
lb = val['xb'][0] - tol
rb = val['xb'][-1] + tol
axs[0].axvspan(lb, rb, 0, 1, color=color[key], alpha=0.3)
axs[1].axvspan(lb, rb, 0, 1, color=color[key], alpha=0.3)
plt.show()
[docs]class ABCIDataExtraction:
def __init__(self):
pass
[docs] def multiple_folders_data(self, shape_space, abci_data_dir, request, save_excel, mon_interval=None, dip_interval=None, parallel=False):
# process interval
# Zmax
if mon_interval is None:
mon_interval = [[0.0, 2e10]]
else:
mon_interval = self.process_interval(mon_interval)
if dip_interval is None:
dip_interval = [[0.0, 2e10]]
else:
dip_interval = self.process_interval(dip_interval)
reply = "Yes"
def button_clicked(i):
return i.text()
if os.path.exists(f'{save_excel}.xlsx') and not parallel:
msg = QMessageBox()
msg.setWindowTitle("File Exist")
msg.setText(f"Hey Chief, seems you've already processed the data for this folder. Do you want to overwrite it?")
msg.setIcon(QMessageBox.Question)
msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
msg.setDefaultButton(QMessageBox.No)
msg.buttonClicked.connect(button_clicked)
x = msg.exec_()
if x == msg.Yes:
reply = 'Yes'
if x == msg.No:
reply = 'No'
if reply == "Yes":
d = shape_space
# data arrays initialization
A, B, a, b, Ri, L, Req, alpha = [], [], [], [], [], [], [], []
k_loss_array_transverse = []
k_loss_array_longitudinal = []
k_loss_M0 = []
key_list = []
# create list to hold Z
Zmax_mon_list = []
Zmax_dip_list = []
xmax_mon_list = []
xmax_dip_list = []
for i in range(len(mon_interval)):
Zmax_mon_list.append([])
xmax_mon_list.append([])
for i in range(len(dip_interval)):
Zmax_dip_list.append([])
xmax_dip_list.append([])
def append_geom_parameters(values):
A.append(values[0])
B.append(values[1])
a.append(values[2])
b.append(values[3])
Ri.append(values[4])
L.append(values[5])
Req.append(values[6])
alpha.append(values[7])
def calc_k_loss():
for key, value in d.items():
print(f"Processing for Cavity {key}")
abci_data_long = ABCIData(abci_data_dir, key, 0)
abci_data_trans = ABCIData(abci_data_dir, key, 1)
# trans
x, y, _ = abci_data_trans.get_data('Real Part of Transverse Impedance')
k_loss_trans = abci_data_trans.loss_factor['Transverse']
if math.isnan(k_loss_trans):
print_(f"Encountered an exception: Check shape {key}")
continue
# long
x, y, _ = abci_data_long.get_data('Real Part of Longitudinal Impedance')
abci_data_long.get_data('Loss Factor Spectrum Integrated up to F')
k_M0 = abci_data_long.y_peaks[0]
k_loss_long = abs(abci_data_long.loss_factor['Longitudinal'])
k_loss_HOM = k_loss_long - k_M0
# append only after successful run
append_geom_parameters(value)
key_list.append(key)
k_loss_M0.append(k_M0)
k_loss_array_longitudinal.append(k_loss_HOM)
k_loss_array_transverse.append(k_loss_trans)
def get_Zmax(mon_interval=None, dip_interval=None):
if mon_interval is None:
mon_interval = [0.0, 2e10]
if dip_interval is None:
dip_interval = [0.0, 2e10]
for key, value in d.items():
print(f"Processing for Cavity {key}")
abci_data_mon = ABCIData(abci_data_dir, key, 0)
abci_data_dip = ABCIData(abci_data_dir, key, 1)
# get longitudinal and transverse impedance plot data
xr_mon, yr_mon, _ = abci_data_mon.get_data('Real Part of Longitudinal Impedance')
xi_mon, yi_mon, _ = abci_data_mon.get_data('Imaginary Part of Longitudinal Impedance')
xr_dip, yr_dip, _ = abci_data_dip.get_data('Real Part of Transverse Impedance')
xi_dip, yi_dip, _ = abci_data_dip.get_data('Imaginary Part of Transverse Impedance')
# Zmax
if mon_interval is None:
mon_interval = [[0.0, 10]]
if dip_interval is None:
dip_interval = [[0.0, 10]]
# calculate magnitude
ymag_mon = [(a ** 2 + b ** 2) ** 0.5 for a, b in zip(yr_mon, yi_mon)]
ymag_dip = [(a ** 2 + b ** 2) ** 0.5 for a, b in zip(yr_dip, yi_dip)]
# get peaks
peaks_mon, _ = sps.find_peaks(ymag_mon, height=0)
xp_mon, yp_mon = np.array(xr_mon)[peaks_mon], np.array(ymag_mon)[peaks_mon]
peaks_dip, _ = sps.find_peaks(ymag_dip, height=0)
xp_dip, yp_dip = np.array(xr_dip)[peaks_dip], np.array(ymag_dip)[peaks_dip]
for i, z_bound in enumerate(mon_interval):
# get mask
msk_mon = [(z_bound[0] < x < z_bound[1]) for x in xp_mon]
if len(yp_mon[msk_mon]) != 0:
Zmax_mon = max(yp_mon[msk_mon])
Zmax_mon_list[i].append(Zmax_mon)
elif len(yp_mon) != 0:
Zmax_mon_list[i].append(0)
else:
continue
for i, z_bound in enumerate(dip_interval):
# get mask
msk_dip = [(z_bound[0] < x < z_bound[1]) for x in xp_dip]
if len(yp_dip[msk_dip]) != 0:
Zmax_dip = max(yp_dip[msk_dip])
Zmax_dip_list[i].append(Zmax_dip)
elif len(yp_dip) != 0:
Zmax_dip_list[i].append(0)
else:
continue
append_geom_parameters(value["IC"])
def all(mon_interval, dip_interval):
for key, value in d.items():
print(f"Processing for Cavity {key}")
abci_data_long = ABCIData(abci_data_dir, f"{key}_", 0)
abci_data_trans = ABCIData(abci_data_dir, f"{key}_", 1)
# get longitudinal and transverse impedance plot data
xr_mon, yr_mon, _ = abci_data_long.get_data('Real Part of Longitudinal Impedance')
xi_mon, yi_mon, _ = abci_data_long.get_data('Imaginary Part of Longitudinal Impedance')
xr_dip, yr_dip, _ = abci_data_trans.get_data('Real Part of Transverse Impedance')
xi_dip, yi_dip, _ = abci_data_trans.get_data('Imaginary Part of Transverse Impedance')
# loss factors
# trans
k_loss_trans = abci_data_trans.loss_factor['Transverse']
if math.isnan(k_loss_trans):
print_(f"Encountered an exception: Check shape {key}")
continue
# long
abci_data_long.get_data('Loss Factor Spectrum Integrated upto F')
k_M0 = abci_data_long.y_peaks[0]
k_loss_long = abs(abci_data_long.loss_factor['Longitudinal'])
k_loss_HOM = k_loss_long - k_M0
# calculate magnitude
ymag_mon = [(a ** 2 + b ** 2) ** 0.5 for a, b in zip(yr_mon, yi_mon)]
ymag_dip = [(a ** 2 + b ** 2) ** 0.5 for a, b in zip(yr_dip, yi_dip)]
# get peaks
peaks_mon, _ = sps.find_peaks(ymag_mon, height=0)
xp_mon, yp_mon = np.array(xr_mon)[peaks_mon], np.array(ymag_mon)[peaks_mon]
peaks_dip, _ = sps.find_peaks(ymag_dip, height=0)
xp_dip, yp_dip = np.array(xr_dip)[peaks_dip], np.array(ymag_dip)[peaks_dip]
for i, z_bound in enumerate(mon_interval):
# get mask
msk_mon = [(z_bound[0] < x < z_bound[1]) for x in xp_mon]
if len(yp_mon[msk_mon]) != 0:
Zmax_mon = max(yp_mon[msk_mon])
xmax_mon = xp_mon[np.where(yp_mon == Zmax_mon)][0]
Zmax_mon_list[i].append(Zmax_mon)
xmax_mon_list[i].append(xmax_mon)
elif len(yp_mon) != 0:
Zmax_mon_list[i].append(0.0)
xmax_mon_list[i].append(0.0)
else:
continue
for i, z_bound in enumerate(dip_interval):
# get mask
msk_dip = [(z_bound[0] < x < z_bound[1]) for x in xp_dip]
if len(yp_dip[msk_dip]) != 0:
Zmax_dip = max(yp_dip[msk_dip])
xmax_dip = xp_dip[np.where(yp_dip == Zmax_dip)][0]
Zmax_dip_list[i].append(Zmax_dip)
xmax_dip_list[i].append(xmax_dip)
elif len(yp_dip) != 0:
Zmax_dip_list[i].append(0.0)
xmax_dip_list[i].append(0.0)
else:
continue
# append only after successful run
append_geom_parameters(value["IC"])
key_list.append(key)
k_loss_M0.append(k_M0)
k_loss_array_longitudinal.append(k_loss_HOM)
k_loss_array_transverse.append(k_loss_trans)
if request.lower() == 'k_loss':
calc_k_loss()
# save excel
try:
data = {'key': key_list, 'A': A, 'B': B, 'a': a, 'b': b, 'Ri': Ri, 'L': L, 'Req': Req, "alpha": alpha,
'k_loss_M0': k_loss_M0, 'k_loss_long': k_loss_array_longitudinal, 'k_loss_trans': k_loss_array_transverse}
df = pd.DataFrame.from_dict(data)
df.to_excel(f'{save_excel}.xlsx', index=False)
except Exception as e:
print("Oops! Encountered some error trying to save file: ", e)
if request.lower() == 'zmax':
get_Zmax(mon_interval, dip_interval)
print(len(Zmax_mon_list), len(Zmax_dip_list), len(A))
# save excel
try:
data = {'key': key_list, 'A': A, 'B': B, 'a': a, 'b': b, 'Ri': Ri, 'L': L, 'Req': Req, "alpha": alpha}
# add impedance lists
for i, msk in enumerate(mon_interval):
data[f'Z_long[max({msk[0]}<f<{msk[1]})]'] = Zmax_mon_list[i]
for i, msk in enumerate(dip_interval):
data[f'Z_trans[max({msk[0]}<f<{msk[1]})]'] = Zmax_dip_list[i]
df = pd.DataFrame.from_dict(data)
df.to_excel(f'{save_excel}.xlsx', index=False)
except Exception as e:
print("Oops! Encountered some error trying to save file: ", e)
if request.lower() == 'all':
all(mon_interval, dip_interval)
# save excel
try:
data = {'key': key_list, 'A': A, 'B': B, 'a': a, 'b': b, 'Ri': Ri, 'L': L, 'Req': Req, "alpha": alpha,
'k_loss_M0': k_loss_M0, 'k_loss_long': k_loss_array_longitudinal, 'k_loss_trans': k_loss_array_transverse}
# add impedance lists
for i, msk in enumerate(mon_interval):
data[f'Z_long[max({msk[0]}<f<{msk[1]})]'] = Zmax_mon_list[i]
data[f'fmax[max({msk[0]}<f<{msk[1]})]'] = xmax_mon_list[i]
for i, msk in enumerate(dip_interval):
data[f'Z_trans[max({msk[0]}<f<{msk[1]})]'] = Zmax_dip_list[i]
data[f'fmax[max({msk[0]}<f<{msk[1]})]'] = xmax_dip_list[i]
df = pd.DataFrame.from_dict(data)
df.to_excel(f'{save_excel}.xlsx', index=False)
except Exception as e:
print("Oops! Encountered some error trying to save file: ", e)
[docs] def multiple_folders_data_parallel(self, shape_space, abci_data_folder, proc_count, request, save_excel, temp_folder, mon_interval=None, dip_interval=None):
# create temporary folder
if os.path.exists(fr"{temp_folder}"):
pass
else:
os.mkdir(fr"{temp_folder}")
processes = []
keys = list(shape_space.keys())
shape_space_len = len(keys)
share = round(shape_space_len / proc_count)
for p in range(proc_count):
# create temporary shape spaces
if p < proc_count - 1:
proc_keys_list = keys[p * share:p * share + share]
else:
proc_keys_list = keys[p * share:]
print(proc_keys_list)
processor_shape_space = {}
for key, val in shape_space.items():
if key in proc_keys_list:
processor_shape_space[key] = val
service = mp.Process(target=self.multiple_folders_data,
args=(processor_shape_space, abci_data_folder, request),
# keys[-1]]
kwargs={'save_excel': f'{temp_folder}\Proc_{p}', 'mon_interval': mon_interval,
'dip_interval': dip_interval, 'parallel': True})
service.start()
processes.append(service)
for p in processes:
p.join()
# join temporary files and delete
self.join_excel('Proc', proc_count, save_excel, temp_folder)
# delete temporary folder
shutil.rmtree(temp_folder)
[docs] def process_interval(self, interval_list):
interval = []
for i in range(len(interval_list)-1):
interval.append([interval_list[i], interval_list[i+1]])
return interval
[docs] def join_excel(self, generic_name, proc_count, save_excel, temp_folder):
df = fr.excel_reader(fr'{temp_folder}\{generic_name}_{0}.xlsx')['Sheet1']
for p in range(1, proc_count):
d = fr.excel_reader(fr'{temp_folder}\{generic_name}_{p}.xlsx')
d = d['Sheet1']
df = pd.merge(df, d, how='outer')
try:
df.to_excel(f'{save_excel}.xlsx', index=False)
except Exception as e:
print("Oops! Encountered some error trying to save file: ", e)
if __name__ == '__main__':
directory = r"D:\Dropbox\Projects\NewFolder\SimulationData\ABCI"
fid = "Cavity0" # folder name
MROT = "1" # 0 for monopole, 1 for dipole
# create ABCIData object
abci_data = ABCIData(directory, fid, MROT)
# call get_data method and provide key
keys = {0: r'Cavity Shape Input',
1: r'Cavity Shape Used',
2: r'Wake Potentials',
3: r'Real Part of Longitudinal Impedance',
4: r'Imaginary Part of Longitudinal Impedance',
5: 'Frequency Spectrum of Loss Factor',
6: r'Loss Factor Spectrum Integrated upto F',
7: r'Real Part of Long. + Log Impedance',
8: r'Imaginary Part of Long. + Log Impedance',
9: r'Spectrum of Long. + Log Loss Factor',
10: r'Long. + Log Factor Integrated up to F',
11: r'Real Part of Azimuthal Impedance',
12: r'Imaginary Part of Azimuthal Impedance',
13: r'Real Part of Transverse Impedance',
14: r'Imaginary Part of Transverse Impedance'}
x, y, _ = abci_data.get_data(key=3) # For the key, either the title of the plot can be given as input or the index
print(x)
print(y)
import matplotlib.pyplot as plt
plt.plot(x, y)
plt.show()