ldcast_code / ldcast /features /transform.py
weatherforecast1024's picture
Upload folder using huggingface_hub
d2f661a verified
import concurrent.futures
import multiprocessing
from numba import njit, prange
import numpy as np
from scipy.ndimage import convolve
def quick_cast(x, y):
num_threads = multiprocessing.cpu_count()
with concurrent.futures.ThreadPoolExecutor(num_threads) as executor:
futures = {}
limits = np.linspace(0, x.shape[0], num_threads+1).round().astype(int)
def _cast(k0,k1):
y[k0:k1,...] = x[k0:k1,...]
for k in range(len(limits)-1):
args = (_cast, limits[k], limits[k+1])
futures[executor.submit(*args)] = k
concurrent.futures.wait(futures)
def cast(dtype=np.float16):
xc = None
def transform(raw):
nonlocal xc
if (xc is None) or (xc.shape != raw.shape):
xc = np.empty_like(raw, dtype=dtype)
quick_cast(raw, xc)
return xc
return transform
@njit(parallel=True)
def scale_array(in_arr, out_arr, scale):
in_arr = in_arr.ravel()
out_arr = out_arr.ravel()
for i in prange(in_arr.shape[0]):
out_arr[i] = scale[in_arr[i]]
# NumPy version
#def scale_array(in_arr, out_arr, scale):
# out_arr[:] = scale[in_arr]
def normalize(mean=0.0, std=1.0, dtype=np.float32):
scaled = scaled_dt = None
def transform(raw):
nonlocal scaled, scaled_dt
if (scaled is None) or (scaled.shape != raw.shape):
scaled = np.empty_like(raw, dtype=np.float32)
scaled_dt = np.empty_like(raw, dtype=dtype)
normalize_array(raw, scaled, mean, std)
if dtype == np.float32:
return scaled
else:
quick_cast(scaled, scaled_dt)
return scaled_dt
return transform
def normalize_threshold(mean=0.0, std=1.0, threshold=0.0, fill_value=0.0, log=False):
scaled = None
def transform(raw):
nonlocal scaled
if (scaled is None) or (scaled.shape != raw.shape):
scaled = np.empty_like(raw, dtype=np.float32)
normalize_threshold_array(raw, scaled, mean, std, threshold, fill_value, log=log)
return scaled
return transform
def scale_log_norm(scale, threshold=None, missing_value=None,
fill_value=0, mean=0.0, std=1.0, dtype=np.float32):
log_scale = np.log10(scale, where=scale>0).astype(np.float32)
if threshold is not None:
log_scale[log_scale < np.log10(threshold)] = np.log10(fill_value)
if missing_value is not None:
log_scale[missing_value] = np.log10(fill_value)
log_scale[~np.isfinite(log_scale)] = np.log10(fill_value)
log_scale -= mean
log_scale /= std
scaled = scaled_dt = None
def transform(raw):
nonlocal scaled, scaled_dt
if (scaled is None) or (scaled.shape != raw.shape):
scaled = np.empty_like(raw, dtype=np.float32)
scaled_dt = np.empty_like(raw, dtype=dtype)
scale_array(raw, scaled, log_scale)
if dtype == np.float32:
return scaled
else:
quick_cast(scaled, scaled_dt)
return scaled_dt
return transform
def combine(transforms, memory_format="channels_first", dim=3):
#combined = None
channels_axis = 1 if (memory_format == "channels_first") else -1
def transform(*raw):
#nonlocal combined
transformed = [t(r) for (t, r) in zip(transforms, raw)]
for i in range(len(transformed)):
if transformed[i].ndim == dim + 1:
transformed[i] = np.expand_dims(transformed[i], channels_axis)
return np.concatenate(transformed, axis=channels_axis)
return transform
class Antialiasing:
def __init__(self):
(x,y) = np.mgrid[-2:3,-2:3]
self.kernel = np.exp(-0.5*(x**2+y**2)/(0.5**2))
self.kernel /= self.kernel.sum()
self.edge_factors = {}
self.img_smooth = {}
num_threads = multiprocessing.cpu_count()
self.executor = concurrent.futures.ThreadPoolExecutor(num_threads)
def __call__(self, img):
img_shape = img.shape[-2:]
if img_shape not in self.edge_factors:
s = convolve(np.ones(img_shape, dtype=np.float32),
self.kernel, mode="constant")
s = 1.0/s
self.edge_factors[img_shape] = s
else:
s = self.edge_factors[img_shape]
if img.shape not in self.img_smooth:
img_smooth = np.empty_like(img)
self.img_smooth[img_shape] = img_smooth
else:
img_smooth = self.img_smooth[img_shape]
def _convolve_frame(i,j):
convolve(img[i,j,:,:], self.kernel,
mode="constant", output=img_smooth[i,j,:,:])
img_smooth[i,j,:,:] *= s
futures = []
for i in range(img.shape[0]):
for j in range(img.shape[1]):
args = (_convolve_frame, i, j)
futures.append(self.executor.submit(*args))
concurrent.futures.wait(futures)
return img_smooth
def default_rainrate_transform(scale):
scaling = scale_log_norm(
scale, threshold=0.1, fill_value=0.02,
mean=-0.051, std=0.528, dtype=np.float32
)
antialiasing = Antialiasing()
def transform(raw):
x = scaling(raw)
return antialiasing(x)
return transform
def scale_norm(scale, threshold=None, missing_value=None,
fill_value=0, mean=0.0, std=1.0, dtype=np.float32):
scale = scale.astype(np.float32).copy()
scale[np.isnan(scale)] = fill_value
if threshold is not None:
scale[scale < threshold] = fill_value
if missing_value is not None:
missing_value = np.atleast_1d(missing_value)
for m in missing_value:
scale[m] = fill_value
scale -= mean
scale /= std
scaled = scaled_dt = None
def transform(raw):
nonlocal scaled, scaled_dt
if (scaled is None) or (scaled.shape != raw.shape):
scaled = np.empty_like(raw, dtype=np.float32)
scaled_dt = np.empty_like(raw, dtype=dtype)
scale_array(raw, scaled, scale)
if dtype == np.float32:
return scaled
else:
quick_cast(scaled, scaled_dt)
return scaled_dt
return transform
@njit(parallel=True)
def threshold_array(in_arr, out_arr, threshold):
in_arr = in_arr.ravel()
out_arr = out_arr.ravel()
for i in prange(in_arr.shape[0]):
out_arr[i] = np.float32(in_arr[i] >= threshold)
def one_hot(values):
translation = np.zeros(max(values)+1, dtype=int)
num_categories = len(values)
for (i,v) in enumerate(values):
translation[v] = i
onehot = onehot_dt = None
def transform(raw):
nonlocal onehot, onehot_dt
if (onehot is None) or (onehot.shape[:-1] != raw.shape):
onehot = np.empty(raw.shape+(num_categories,),
dtype=np.float32)
onehot = np.empty(raw.shape+(num_categories,),
dtype=np.uint8)
onehot_transform(raw, onehot, translation)
quick_cast(onehot, onehot_dt)
return onehot
return transform
@njit(parallel=True)
def onehot_transform(in_arr, out_arr, translation):
for k in prange(in_arr.shape[0]):
out_arr[k,...] = 0.0
for t in range(in_arr.shape[1]):
for i in range(in_arr.shape[2]):
for j in range(in_arr.shape[3]):
ind = np.uint64(in_arr[k,t,i,j])
c = translation[ind]
out_arr[k,t,i,j,c] = 1.0
@njit(parallel=True)
def normalize_array(in_arr, out_arr, mean, std):
mean = np.float32(mean)
inv_std = np.float32(1.0/std)
in_arr = in_arr.ravel()
out_arr = out_arr.ravel()
for i in prange(in_arr.shape[0]):
out_arr[i] = (in_arr[i]-mean)*inv_std
@njit(parallel=True)
def normalize_threshold_array(
in_arr, out_arr,
mean, std,
threshold, fill_value, log=False
):
mean = np.float32(mean)
inv_std = np.float32(1.0/std)
threshold = np.float32(threshold)
fill_value = np.float32(fill_value)
in_arr = in_arr.ravel()
out_arr = out_arr.ravel()
for i in prange(in_arr.shape[0]):
x = in_arr[i]
if x < threshold:
x = fill_value
if log:
x = np.log10(x)
out_arr[i] = (x-mean)*inv_std
# NumPy version
#def threshold_array(in_arr, out_arr, threshold):
# out_arr[:] = (in_arr >= threshold).astype(np.float32)
def R_threshold(scale, threshold):
thresholded = None
scale_treshold = np.nanargmax(scale > threshold)
def transform(rzc_raw):
nonlocal thresholded
if (thresholded is None) or (thresholded.shape != rzc_raw.shape):
thresholded = np.empty_like(rzc_raw, dtype=np.float32)
threshold_array(rzc_raw, thresholded, scale_treshold)
return thresholded
return transform