| | import json |
| | import logging |
| | import os |
| | from collections import namedtuple |
| | from datetime import datetime, timedelta |
| |
|
| | import matplotlib.pyplot as plt |
| | import numpy as np |
| | from .detect_peaks import detect_peaks |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| |
|
| | def extract_picks( |
| | preds, |
| | file_names=None, |
| | begin_times=None, |
| | station_ids=None, |
| | dt=0.01, |
| | phases=["P", "S"], |
| | config=None, |
| | waveforms=None, |
| | use_amplitude=False, |
| | upload_waveform=False, |
| | ): |
| | """Extract picks from prediction results. |
| | Args: |
| | preds ([type]): [Nb, Nt, Ns, Nc] "batch, time, station, channel" |
| | file_names ([type], optional): [Nb]. Defaults to None. |
| | station_ids ([type], optional): [Ns]. Defaults to None. |
| | t0 ([type], optional): [Nb]. Defaults to None. |
| | config ([type], optional): [description]. Defaults to None. |
| | |
| | Returns: |
| | picks [type]: {file_name, station_id, pick_time, pick_prob, pick_type} |
| | """ |
| |
|
| | mph = {} |
| | if config is None: |
| | for x in phases: |
| | mph[x] = 0.3 |
| | mpd = 50 |
| | |
| | pre_idx = int(1 / dt) |
| | post_idx = int(4 / dt) |
| | else: |
| | mph["P"] = config.min_p_prob |
| | mph["S"] = config.min_s_prob |
| | mph["PS"] = 0.3 |
| | mpd = config.mpd |
| | pre_idx = int(config.pre_sec / dt) |
| | post_idx = int(config.post_sec / dt) |
| |
|
| | Nb, Nt, Ns, Nc = preds.shape |
| |
|
| | if file_names is None: |
| | file_names = [f"{i:04d}" for i in range(Nb)] |
| | elif not (isinstance(file_names, np.ndarray) or isinstance(file_names, list)): |
| | if isinstance(file_names, bytes): |
| | file_names = file_names.decode() |
| | file_names = [file_names] * Nb |
| | else: |
| | file_names = [x.decode() if isinstance(x, bytes) else x for x in file_names] |
| |
|
| | if begin_times is None: |
| | begin_times = ["1970-01-01T00:00:00.000+00:00"] * Nb |
| | else: |
| | begin_times = [x.decode() if isinstance(x, bytes) else x for x in begin_times] |
| |
|
| | picks = [] |
| | for i in range(Nb): |
| |
|
| | file_name = file_names[i] |
| | begin_time = datetime.fromisoformat(begin_times[i]) |
| |
|
| | for j in range(Ns): |
| | if (station_ids is None) or (len(station_ids[i]) == 0): |
| | station_id = f"{j:04d}" |
| | else: |
| | station_id = station_ids[i].decode() if isinstance(station_ids[i], bytes) else station_ids[i] |
| |
|
| | if (waveforms is not None) and use_amplitude: |
| | amp = np.max(np.abs(waveforms[i, :, j, :]), axis=-1) |
| | for k in range(Nc - 1): |
| | idxs, probs = detect_peaks(preds[i, :, j, k + 1], mph=mph[phases[k]], mpd=mpd, show=False) |
| | for l, (phase_index, phase_prob) in enumerate(zip(idxs, probs)): |
| | pick_time = begin_time + timedelta(seconds=phase_index * dt) |
| | pick = { |
| | "file_name": file_name, |
| | "station_id": station_id, |
| | "begin_time": begin_time.isoformat(timespec="milliseconds"), |
| | "phase_index": int(phase_index), |
| | "phase_time": pick_time.isoformat(timespec="milliseconds"), |
| | "phase_score": round(phase_prob, 3), |
| | "phase_type": phases[k], |
| | "dt": dt, |
| | } |
| |
|
| | |
| | if waveforms is not None: |
| | tmp = np.zeros((pre_idx + post_idx, 3)) |
| | lo = phase_index - pre_idx |
| | hi = phase_index + post_idx |
| | insert_idx = 0 |
| | if lo < 0: |
| | lo = 0 |
| | insert_idx = -lo |
| | if hi > Nt: |
| | hi = Nt |
| | tmp[insert_idx : insert_idx + hi - lo, :] = waveforms[i, lo:hi, j, :] |
| | if upload_waveform: |
| | pick["waveform"] = tmp.tolist() |
| | pick["_id"] = f"{pick['station_id']}_{pick['timestamp']}_{pick['type']}" |
| | if use_amplitude: |
| | next_pick = idxs[l + 1] if l < len(idxs) - 1 else (phase_index + post_idx * 3) |
| | pick["phase_amp"] = np.max( |
| | amp[phase_index : min(phase_index + post_idx * 3, next_pick)] |
| | ).item() |
| |
|
| | picks.append(pick) |
| |
|
| | return picks |
| |
|
| |
|
| | def extract_amplitude(data, picks, window_p=10, window_s=5, config=None): |
| | record = namedtuple("amplitude", ["p_amp", "s_amp"]) |
| | dt = 0.01 if config is None else config.dt |
| | window_p = int(window_p / dt) |
| | window_s = int(window_s / dt) |
| | amps = [] |
| | for i, (da, pi) in enumerate(zip(data, picks)): |
| | p_amp, s_amp = [], [] |
| | for j in range(da.shape[1]): |
| | amp = np.max(np.abs(da[:, j, :]), axis=-1) |
| | |
| | |
| | tmp = [] |
| | for k in range(len(pi.p_idx[j]) - 1): |
| | tmp.append(np.max(amp[pi.p_idx[j][k] : min(pi.p_idx[j][k] + window_p, pi.p_idx[j][k + 1])])) |
| | if len(pi.p_idx[j]) >= 1: |
| | tmp.append(np.max(amp[pi.p_idx[j][-1] : pi.p_idx[j][-1] + window_p])) |
| | p_amp.append(tmp) |
| | tmp = [] |
| | for k in range(len(pi.s_idx[j]) - 1): |
| | tmp.append(np.max(amp[pi.s_idx[j][k] : min(pi.s_idx[j][k] + window_s, pi.s_idx[j][k + 1])])) |
| | if len(pi.s_idx[j]) >= 1: |
| | tmp.append(np.max(amp[pi.s_idx[j][-1] : pi.s_idx[j][-1] + window_s])) |
| | s_amp.append(tmp) |
| | amps.append(record(p_amp, s_amp)) |
| | return amps |
| |
|
| |
|
| | def save_picks(picks, output_dir, amps=None, fname=None): |
| | if fname is None: |
| | fname = "picks.csv" |
| |
|
| | int2s = lambda x: ",".join(["[" + ",".join(map(str, i)) + "]" for i in x]) |
| | flt2s = lambda x: ",".join(["[" + ",".join(map("{:0.3f}".format, i)) + "]" for i in x]) |
| | sci2s = lambda x: ",".join(["[" + ",".join(map("{:0.3e}".format, i)) + "]" for i in x]) |
| | if amps is None: |
| | if hasattr(picks[0], "ps_idx"): |
| | with open(os.path.join(output_dir, fname), "w") as fp: |
| | fp.write("fname\tt0\tp_idx\tp_prob\ts_idx\ts_prob\tps_idx\tps_prob\n") |
| | for pick in picks: |
| | fp.write( |
| | f"{pick.fname}\t{pick.t0}\t{int2s(pick.p_idx)}\t{flt2s(pick.p_prob)}\t{int2s(pick.s_idx)}\t{flt2s(pick.s_prob)}\t{int2s(pick.ps_idx)}\t{flt2s(pick.ps_prob)}\n" |
| | ) |
| | fp.close() |
| | else: |
| | with open(os.path.join(output_dir, fname), "w") as fp: |
| | fp.write("fname\tt0\tp_idx\tp_prob\ts_idx\ts_prob\n") |
| | for pick in picks: |
| | fp.write( |
| | f"{pick.fname}\t{pick.t0}\t{int2s(pick.p_idx)}\t{flt2s(pick.p_prob)}\t{int2s(pick.s_idx)}\t{flt2s(pick.s_prob)}\n" |
| | ) |
| | fp.close() |
| | else: |
| | with open(os.path.join(output_dir, fname), "w") as fp: |
| | fp.write("fname\tt0\tp_idx\tp_prob\ts_idx\ts_prob\tp_amp\ts_amp\n") |
| | for pick, amp in zip(picks, amps): |
| | fp.write( |
| | f"{pick.fname}\t{pick.t0}\t{int2s(pick.p_idx)}\t{flt2s(pick.p_prob)}\t{int2s(pick.s_idx)}\t{flt2s(pick.s_prob)}\t{sci2s(amp.p_amp)}\t{sci2s(amp.s_amp)}\n" |
| | ) |
| | fp.close() |
| |
|
| | return 0 |
| |
|
| |
|
| | def calc_timestamp(timestamp, sec): |
| | timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") + timedelta(seconds=sec) |
| | return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] |
| |
|
| |
|
| | def save_picks_json(picks, output_dir, dt=0.01, amps=None, fname=None): |
| | if fname is None: |
| | fname = "picks.json" |
| |
|
| | picks_ = [] |
| | if amps is None: |
| | for pick in picks: |
| | for idxs, probs in zip(pick.p_idx, pick.p_prob): |
| | for idx, prob in zip(idxs, probs): |
| | picks_.append( |
| | { |
| | "id": pick.station_id, |
| | "timestamp": calc_timestamp(pick.t0, float(idx) * dt), |
| | "prob": prob.astype(float), |
| | "type": "p", |
| | } |
| | ) |
| | for idxs, probs in zip(pick.s_idx, pick.s_prob): |
| | for idx, prob in zip(idxs, probs): |
| | picks_.append( |
| | { |
| | "id": pick.station_id, |
| | "timestamp": calc_timestamp(pick.t0, float(idx) * dt), |
| | "prob": prob.astype(float), |
| | "type": "s", |
| | } |
| | ) |
| | else: |
| | for pick, amplitude in zip(picks, amps): |
| | for idxs, probs, amps in zip(pick.p_idx, pick.p_prob, amplitude.p_amp): |
| | for idx, prob, amp in zip(idxs, probs, amps): |
| | picks_.append( |
| | { |
| | "id": pick.station_id, |
| | "timestamp": calc_timestamp(pick.t0, float(idx) * dt), |
| | "prob": prob.astype(float), |
| | "amp": amp.astype(float), |
| | "type": "p", |
| | } |
| | ) |
| | for idxs, probs, amps in zip(pick.s_idx, pick.s_prob, amplitude.s_amp): |
| | for idx, prob, amp in zip(idxs, probs, amps): |
| | picks_.append( |
| | { |
| | "id": pick.station_id, |
| | "timestamp": calc_timestamp(pick.t0, float(idx) * dt), |
| | "prob": prob.astype(float), |
| | "amp": amp.astype(float), |
| | "type": "s", |
| | } |
| | ) |
| | with open(os.path.join(output_dir, fname), "w") as fp: |
| | json.dump(picks_, fp) |
| |
|
| | return 0 |
| |
|
| |
|
| | def convert_true_picks(fname, itp, its, itps=None): |
| | true_picks = [] |
| | if itps is None: |
| | record = namedtuple("phase", ["fname", "p_idx", "s_idx"]) |
| | for i in range(len(fname)): |
| | true_picks.append(record(fname[i].decode(), itp[i], its[i])) |
| | else: |
| | record = namedtuple("phase", ["fname", "p_idx", "s_idx", "ps_idx"]) |
| | for i in range(len(fname)): |
| | true_picks.append(record(fname[i].decode(), itp[i], its[i], itps[i])) |
| |
|
| | return true_picks |
| |
|
| |
|
| | def calc_metrics(nTP, nP, nT): |
| | """ |
| | nTP: true positive |
| | nP: number of positive picks |
| | nT: number of true picks |
| | """ |
| | precision = nTP / nP |
| | recall = nTP / nT |
| | f1 = 2 * precision * recall / (precision + recall) |
| | return [precision, recall, f1] |
| |
|
| |
|
| | def calc_performance(picks, true_picks, tol=3.0, dt=1.0): |
| | assert len(picks) == len(true_picks) |
| | logging.info("Total records: {}".format(len(picks))) |
| |
|
| | count = lambda picks: sum([len(x) for x in picks]) |
| | metrics = {} |
| | for phase in true_picks[0]._fields: |
| | if phase == "fname": |
| | continue |
| | true_positive, positive, true = 0, 0, 0 |
| | residual = [] |
| | for i in range(len(true_picks)): |
| | true += count(getattr(true_picks[i], phase)) |
| | positive += count(getattr(picks[i], phase)) |
| | |
| | diff = dt * ( |
| | np.array(getattr(picks[i], phase))[:, np.newaxis, :] |
| | - np.array(getattr(true_picks[i], phase))[:, :, np.newaxis] |
| | ) |
| | residual.extend(list(diff[np.abs(diff) <= tol])) |
| | true_positive += np.sum(np.abs(diff) <= tol) |
| | metrics[phase] = calc_metrics(true_positive, positive, true) |
| |
|
| | logging.info(f"{phase}-phase:") |
| | logging.info(f"True={true}, Positive={positive}, True Positive={true_positive}") |
| | logging.info(f"Precision={metrics[phase][0]:.3f}, Recall={metrics[phase][1]:.3f}, F1={metrics[phase][2]:.3f}") |
| | logging.info(f"Residual mean={np.mean(residual):.4f}, std={np.std(residual):.4f}") |
| |
|
| | return metrics |
| |
|
| |
|
| | def save_prob_h5(probs, fnames, output_h5): |
| | if fnames is None: |
| | fnames = [f"{i:04d}" for i in range(len(probs))] |
| | elif type(fnames[0]) is bytes: |
| | fnames = [f.decode().rstrip(".npz") for f in fnames] |
| | else: |
| | fnames = [f.rstrip(".npz") for f in fnames] |
| | for prob, fname in zip(probs, fnames): |
| | output_h5.create_dataset(fname, data=prob, dtype="float32") |
| | return 0 |
| |
|
| |
|
| | def save_prob(probs, fnames, prob_dir): |
| | if fnames is None: |
| | fnames = [f"{i:04d}" for i in range(len(probs))] |
| | elif type(fnames[0]) is bytes: |
| | fnames = [f.decode().rstrip(".npz") for f in fnames] |
| | else: |
| | fnames = [f.rstrip(".npz") for f in fnames] |
| | for prob, fname in zip(probs, fnames): |
| | np.savez(os.path.join(prob_dir, fname + ".npz"), prob=prob) |
| | return 0 |
| |
|