| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import copy |
| import json |
| import logging |
| import os |
| import pathlib |
| from typing import Dict, List |
|
|
| import numpy as np |
| from monai.apps.utils import tqdm |
| from monai.utils import optional_import |
|
|
| loadmat, _ = optional_import("scipy.io", name="loadmat") |
| PILImage, _ = optional_import("PIL.Image") |
|
|
|
|
| def consep_nuclei_dataset(datalist, output_dir, crop_size, min_area=80, min_distance=20, limit=0) -> List[Dict]: |
| """ |
| Utility to pre-process and create dataset list for Patches per Nuclei for training over ConSeP dataset. |
| |
| Args: |
| datalist: A list of data dictionary. Each entry should at least contain 'image_key': <image filename>. |
| For example, typical input data can be a list of dictionaries:: |
| |
| [{'image': <image filename>, 'label': <label filename>}] |
| |
| output_dir: target directory to store the training data after flattening |
| crop_size: Crop Size for each patch |
| min_area: Min Area for each nuclei to be included in dataset |
| min_distance: Min Distance from boundary for each nuclei to be included in dataset |
| limit: limit number of inputs for pre-processing. Defaults to 0 (no limit). |
| |
| Raises: |
| ValueError: When ``datalist`` is Empty |
| ValueError: When ``scipy.io.loadmat`` is Not available |
| |
| Returns: |
| A new datalist that contains path to the images/labels after pre-processing. |
| |
| Example:: |
| |
| datalist = consep_nuclei_dataset( |
| datalist=[{'image': 'img1.png', 'label': 'label1.mat'}], |
| output_dir=output, |
| crop_size=128, |
| limit=1, |
| ) |
| |
| print(datalist[0]["image"], datalist[0]["label"]) |
| """ |
|
|
| if not len(datalist): |
| raise ValueError("Input datalist is empty") |
|
|
| if not loadmat: |
| print("Please make sure scipy with loadmat function is correctly installed") |
| raise ValueError("scipy.io.loadmat module/function not found") |
|
|
| dataset_json: List[Dict] = [] |
| for d in tqdm(datalist): |
| logging.debug(f"Processing Image: {d['image']} => Label: {d['label']}") |
|
|
| |
| image = PILImage.open(d["image"]).convert("RGB") |
|
|
| |
| m = loadmat(d["label"]) |
| instances = m["inst_map"] |
|
|
| for nuclei_id, (class_id, (y, x)) in enumerate(zip(m["inst_type"], m["inst_centroid"]), start=1): |
| x, y = (int(x), int(y)) |
| class_id = int(class_id) |
| class_id = 3 if class_id in (3, 4) else 4 if class_id in (5, 6, 7) else class_id |
|
|
| if 0 < limit <= len(dataset_json): |
| return dataset_json |
|
|
| item = __prepare_patch( |
| d=d, |
| nuclei_id=nuclei_id, |
| output_dir=output_dir, |
| image=image, |
| instances=instances, |
| instance_idx=nuclei_id, |
| crop_size=crop_size, |
| class_id=class_id, |
| centroid=(x, y), |
| min_area=min_area, |
| min_distance=min_distance, |
| others_idx=255, |
| ) |
|
|
| if item: |
| dataset_json.append(item) |
|
|
| return dataset_json |
|
|
|
|
| def __prepare_patch( |
| d, |
| nuclei_id, |
| output_dir, |
| image, |
| instances, |
| instance_idx, |
| crop_size, |
| class_id, |
| centroid, |
| min_area, |
| min_distance, |
| others_idx=255, |
| ): |
| image_np = np.array(image) |
| image_size = image.size |
|
|
| bbox = __compute_bbox(crop_size, centroid, image_size) |
|
|
| cropped_label_np = instances[bbox[0] : bbox[2], bbox[1] : bbox[3]] |
| cropped_label_np = np.array(cropped_label_np) |
|
|
| this_label = np.where(cropped_label_np == instance_idx, class_id, 0) |
| if np.count_nonzero(this_label) < min_area: |
| return None |
|
|
| x, y = centroid |
| if x < min_distance or y < min_distance or (image_size[0] - x) < min_distance or (image_size[1] - y < min_distance): |
| return None |
|
|
| centroid = centroid[0] - bbox[0], centroid[1] - bbox[1] |
| others = np.where(np.logical_and(cropped_label_np > 0, cropped_label_np != instance_idx), others_idx, 0) |
| cropped_label_np = this_label + others |
| cropped_label = PILImage.fromarray(cropped_label_np.astype(np.uint8), None) |
|
|
| cropped_image_np = image_np[bbox[0] : bbox[2], bbox[1] : bbox[3], :] |
| cropped_image = PILImage.fromarray(cropped_image_np, "RGB") |
|
|
| images_dir = os.path.join(output_dir, "Images") if output_dir else "Images" |
| labels_dir = os.path.join(output_dir, "Labels") if output_dir else "Labels" |
| centroids_dir = os.path.join(output_dir, "Centroids") if output_dir else "Centroids" |
|
|
| os.makedirs(images_dir, exist_ok=True) |
| os.makedirs(labels_dir, exist_ok=True) |
| os.makedirs(centroids_dir, exist_ok=True) |
|
|
| image_id = pathlib.Path(d["image"]).stem |
| file_prefix = f"{image_id}_{class_id}_{str(instance_idx).zfill(4)}" |
| image_file = os.path.join(images_dir, f"{file_prefix}.png") |
| label_file = os.path.join(labels_dir, f"{file_prefix}.png") |
| centroid_file = os.path.join(centroids_dir, f"{file_prefix}.txt") |
|
|
| cropped_image.save(image_file) |
| cropped_label.save(label_file) |
| with open(centroid_file, "w") as fp: |
| json.dump([centroid], fp) |
|
|
| item = copy.deepcopy(d) |
| item["nuclei_id"] = nuclei_id |
| item["mask_value"] = class_id |
| item["image"] = image_file |
| item["label"] = label_file |
| item["centroid"] = centroid |
| return item |
|
|
|
|
| def __compute_bbox(patch_size, centroid, size): |
| x, y = centroid |
| m, n = size |
|
|
| x_start = int(max(x - patch_size / 2, 0)) |
| y_start = int(max(y - patch_size / 2, 0)) |
| x_end = x_start + patch_size |
| y_end = y_start + patch_size |
| if x_end > m: |
| x_end = m |
| x_start = m - patch_size |
| if y_end > n: |
| y_end = n |
| y_start = n - patch_size |
| return x_start, y_start, x_end, y_end |
|
|