The v7 dataset improves upon the Dipole datasets v6 by adding the following datasets:
removed SFTdy_AWAKE1_MD3___SFT_AWAKE1_MD3.parquet for being too short
In addition, we remove eddy currents from the column B_meas_T_filtered and add a new column B_meas_T_filtered_eddy_corrected with the following script
from __future__ import annotations
import argparse
import pathlib
import sys
import numpy as np
import numpy.typing as npt
import pandas as pd
import scipy.fft
C1 = -3.98e-7
TAU1 = 1.8e-2 # s
C2 = -1.23e-7
TAU2 = 0.37 # s
C3 = -5.41e-9
TAU3 = 1.57 # s
TIME_COL = "time_ms"
CURRENT_DOT_COL = "I_meas_A_filtered_dot"
FIELD_COL = "B_meas_T_filtered"
def exponential_convolution(
I: npt.NDArray[np.float64], t: npt.NDArray[np.float64], tau: float
) -> npt.NDArray[np.float64]:
"""
Compute the convolution of I(t) with exp(-t/tau) using FFT.
Parameters:
I (numpy array): The input function sampled at discrete times.
t (numpy array): The time array corresponding to I.
tau (float): The decay time constant of the exponential.
Returns:
J (numpy array): The result of the convolution.
"""
dt = t[1] - t[0] # Assuming uniform time spacing
N = len(t)
# Construct the exponential kernel
K = np.exp(-t / tau)
# Zero-pad both arrays to avoid circular convolution effects
pad_length = 2 * N
I_padded = np.pad(I, (0, N), mode="constant")
K_padded = np.pad(K, (0, N), mode="constant")
# Perform FFT-based convolution
I_f = scipy.fft.fft(I_padded)
K_f = scipy.fft.fft(K_padded)
J_f = I_f * K_f
J_padded = scipy.fft.ifft(J_f).real # Inverse FFT to get convolution result
# Extract the valid part of the convolution
J = J_padded[:N] * dt # Multiply by dt to match integral scaling
return J
def remove_eddy_currents(
dataset_path: pathlib.Path,
) -> None:
print(f"Processing {dataset_path}")
dataset = pd.read_parquet(dataset_path)
field = dataset[FIELD_COL].to_numpy()
time = dataset[TIME_COL].to_numpy()
current_dot = dataset[CURRENT_DOT_COL].to_numpy()
time = time - time[0] # Normalize time to start from 0
time = time / 1000 # Convert to seconds
offset = (
C1 * exponential_convolution(current_dot, time, TAU1)
+ C2 * exponential_convolution(current_dot, time, TAU2)
+ C3 * exponential_convolution(current_dot, time, TAU3)
)
dataset[f"{FIELD_COL}_eddy_corrected"] = field - offset
dataset.to_parquet(dataset_path)
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"datasets",
nargs="*",
type=pathlib.Path,
help="Path to the datasets (may be directories).",
)
return parser.parse_args(argv)
def main(argv: list[str]) -> None:
args = parse_args(argv)
datasets = (
[args.datasets] if isinstance(args.datasets, pathlib.Path) else args.datasets
)
datasets = [dataset for dataset in datasets if dataset.exists()]
for dataset_path in datasets:
if dataset_path.suffix == ".parquet" and "preprocessed" in dataset_path.name:
remove_eddy_currents(dataset_path)
continue
elif dataset_path.is_dir():
for path in dataset_path.rglob("*"):
if path.suffix == ".parquet" and "preprocessed" in path.name:
remove_eddy_currents(path)
continue
if __name__ == "__main__":
main(sys.argv[1:])