Spaces:
No application file
No application file
| # -*- coding: utf-8 -*- | |
| """Untitled8.ipynb | |
| Automatically generated by Colaboratory. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1nXE8zcbIHJ-cVLamX0RVlAOgQOrUmFu- | |
| """ | |
| import os | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from tensorflow.keras.applications import ResNet50V2 | |
| from tensorflow.keras.applications.resnet_v2 import preprocess_input | |
| from tensorflow.keras.models import Sequential, Model | |
| from tensorflow.keras.layers import (Conv2D, MaxPool2D, Flatten, MaxPooling2D, PReLU, Dense, Dropout, BatchNormalization, | |
| GlobalAveragePooling2D, GaussianNoise) | |
| from tensorflow.keras.layers.experimental.preprocessing import RandomFlip, RandomRotation,RandomTranslation, RandomZoom, RandomContrast, RandomHeight, RandomWidth | |
| from tensorflow.keras.optimizers import Adam | |
| from tensorflow.keras.callbacks import EarlyStopping | |
| from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler | |
| import cv2 | |
| import matplotlib.pyplot as plt | |
| # Constants | |
| BASE_PATH = "C:/Users/101231186/Desktop/AML" | |
| TRAIN_PATH = os.path.join(BASE_PATH, "train_data") | |
| TEST_PATH = os.path.join(BASE_PATH, "test_data") | |
| VAL_PATH = os.path.join(BASE_PATH, "val_data") | |
| num_classes = 4000 | |
| def count_directories(path): | |
| return len([name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]) | |
| def create_labels_file(dataset_path, file_name): | |
| image_paths, image_labels = [], [] | |
| class_dirs = sorted([d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]) | |
| for class_label, class_dir in enumerate(class_dirs): | |
| class_dir_path = os.path.join(dataset_path, class_dir) | |
| image_files = os.listdir(class_dir_path) | |
| for image_file in image_files: | |
| image_file_path = os.path.join(class_dir_path, image_file) | |
| if os.path.isfile(image_file_path): | |
| image_paths.append(image_file_path) | |
| image_labels.append(class_label) | |
| with open(file_name, 'w') as f: | |
| for path, label in zip(image_paths, image_labels): | |
| f.write(f"{path} {label}\n") | |
| def load_labels(file_path, base_img_path): | |
| list_IDs, labels = [], {} | |
| with open(file_path, 'r') as file: | |
| for line in file: | |
| relative_image_path, label = line.strip().rsplit(' ', 1) | |
| full_image_path = os.path.join(base_img_path, relative_image_path) | |
| list_IDs.append(full_image_path) | |
| labels[full_image_path] = int(label) | |
| return list_IDs, labels | |
| class DataGenerator(keras.utils.Sequence): | |
| 'Generates data for Keras' | |
| def __init__(self, list_IDs, labels, batch_size=32, dim=(224,224), n_channels=3, | |
| n_classes=4000, shuffle=True): | |
| 'Initialization' | |
| self.dim = dim | |
| self.batch_size = batch_size | |
| self.labels = labels | |
| self.list_IDs = list_IDs | |
| self.n_channels = n_channels | |
| self.n_classes = n_classes | |
| self.shuffle = shuffle | |
| self.on_epoch_end() | |
| def __len__(self): | |
| 'Denotes the number of batches per epoch' | |
| return int(np.floor(len(self.list_IDs) / self.batch_size)) | |
| def __getitem__(self, index): | |
| 'Generate one batch of data' | |
| # Generate indexes of the batch | |
| indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size] | |
| # Find list of IDs | |
| list_IDs_temp = [self.list_IDs[k] for k in indexes] | |
| # Generate data | |
| X, y = self.__data_generation(list_IDs_temp) | |
| return X, y | |
| def on_epoch_end(self): | |
| 'Updates indexes after each epoch' | |
| self.indexes = np.arange(len(self.list_IDs)) | |
| if self.shuffle: | |
| np.random.shuffle(self.indexes) | |
| def __data_generation(self, list_IDs_temp): | |
| 'Generates data containing batch_size samples' | |
| # Initialization | |
| X = np.empty((self.batch_size, *self.dim, self.n_channels)) | |
| y = np.empty((self.batch_size), dtype=int) | |
| # Generate data | |
| for i, ID in enumerate(list_IDs_temp): | |
| # Load and preprocess the image | |
| img = self.load_and_preprocess_image(ID, self.dim) | |
| X[i,] = img | |
| # Store class | |
| y[i] = self.labels[ID] | |
| return X, keras.utils.to_categorical(y, num_classes=self.n_classes) | |
| def load_and_preprocess_image(self, image_path, target_size): | |
| # Load the image file | |
| image = cv2.imread(image_path) | |
| image = cv2.resize(image, target_size) # Resize image | |
| # No need to convert to RGB as preprocess_input will handle it | |
| image = preprocess_input(image) # Use ResNet50V2's preprocess_input | |
| return image | |
| # Count directories | |
| train_classes_count = count_directories(TRAIN_PATH) | |
| test_classes_count = count_directories(TEST_PATH) | |
| val_classes_count = count_directories(VAL_PATH) | |
| print('Total training classes:', train_classes_count) | |
| print('Total testing classes:', test_classes_count) | |
| print('Total validation classes:', val_classes_count) | |
| # Create labels files | |
| create_labels_file(TRAIN_PATH, 'labels_train.txt') | |
| create_labels_file(TEST_PATH, 'labels_test.txt') | |
| create_labels_file(VAL_PATH, 'labels_val.txt') | |
| # Load labels and initialize generators | |
| train_list_IDs, train_labels = load_labels('labels_train.txt', TRAIN_PATH) | |
| test_list_IDs, test_labels = load_labels('labels_test.txt', TEST_PATH) | |
| val_list_IDs, val_labels = load_labels('labels_val.txt', VAL_PATH) | |
| training_generator = DataGenerator(train_list_IDs, train_labels) | |
| testing_generator = DataGenerator(test_list_IDs, test_labels) | |
| validation_generator = DataGenerator(val_list_IDs, val_labels) | |
| # Load pre-trained ResNet50V2 model (excluding the top layer) | |
| base_model = ResNet50V2(weights='imagenet', include_top=False, input_shape=(224, 224, 3)) | |
| # Freeze the layers of the pre-trained model | |
| for layer in base_model.layers: | |
| layer.trainable = False | |
| # Unfreezing some layers of the base model for fine-tuning | |
| for layer in base_model.layers[-30:]: # Unfreeze last 30 layers | |
| layer.trainable = True | |
| # Data Augmentation | |
| data_augmentation = Sequential([ | |
| RandomFlip('horizontal_and_vertical'), | |
| RandomRotation(0.1), | |
| RandomZoom(0.1), | |
| RandomTranslation(height_factor=0.1, width_factor=0.1), | |
| RandomHeight(0.1), | |
| RandomWidth(0.1) | |
| ]) | |
| # Unfreezing some layers of the base model for fine-tuning | |
| for layer in base_model.layers[-30:]: # Unfreeze last 30 layers | |
| layer.trainable = True | |
| # Define the new top layers for embedding | |
| embedding_layer = tf.keras.Sequential([ | |
| tf.keras.layers.Dropout(0.5), | |
| tf.keras.layers.BatchNormalization(), | |
| tf.keras.layers.Dense(200, activation='relu'), | |
| tf.keras.layers.Dropout(0.3), | |
| tf.keras.layers.GlobalAveragePooling2D(), | |
| tf.keras.layers.Dense(num_classes, activation='softmax') | |
| ]) | |
| # Define the model | |
| model = tf.keras.Sequential([ | |
| base_model, | |
| data_augmentation, | |
| embedding_layer, | |
| Dense(num_classes, activation='softmax') # This layer is for training purposes only | |
| ]) | |
| # Learning Rate Scheduling | |
| lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay( | |
| initial_learning_rate=1e-3, | |
| decay_steps=1000, | |
| decay_rate=0.9) | |
| optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule) | |
| # Callbacks | |
| checkpoint = ModelCheckpoint('best_model.h5', monitor='val_accuracy', save_best_only=True) | |
| def scheduler(epoch, lr): | |
| if epoch < 10: | |
| return lr | |
| else: | |
| return lr * tf.math.exp(-0.1) | |
| lr_scheduler = LearningRateScheduler(scheduler) | |
| # Define metrics | |
| top1 = tf.keras.metrics.TopKCategoricalAccuracy(k=1) | |
| # Compile the model with a different optimizer or learning rate | |
| model.compile(optimizer=Adam(learning_rate=1e-4), # Adjusted learning rate | |
| loss='categorical_crossentropy', | |
| metrics=['accuracy']) | |
| # Train the model with callbacks and class weights if necessary | |
| class_weights = {i: 1.0 for i in range(num_classes)} # Modify these weights if needed | |
| history = model.fit(training_generator, | |
| epochs=40, | |
| validation_data=validation_generator, | |
| callbacks=[EarlyStopping(verbose=1, patience=5), checkpoint, lr_scheduler], | |
| class_weight=class_weights) | |
| # Evaluate the model | |
| evaluation_results = model.evaluate(validation_generator) | |
| print(f"Validation Loss: {evaluation_results[0]}, Validation Accuracy: {evaluation_results[1]}") | |
| import tensorflow as tf | |
| import matplotlib.pyplot as plt | |
| from sklearn.metrics import roc_curve, auc | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| # After training and evaluation, compute the average accuracy per class | |
| def average_accuracy_per_class(model, generator, num_classes): | |
| # Initialize a list to store correct counts for each class | |
| correct_counts = [0] * num_classes | |
| total_counts = [0] * num_classes | |
| # Loop through the batches in the generator | |
| for images, labels in generator: | |
| predictions = model.predict(images) | |
| predicted_classes = tf.argmax(predictions, axis=1) | |
| true_classes = tf.argmax(labels, axis=1) | |
| # Update the correct count and total count for each class | |
| for i in range(len(true_classes)): | |
| true_class_index = true_classes[i] | |
| total_counts[true_class_index] += 1 | |
| if predicted_classes[i] == true_class_index: | |
| correct_counts[true_class_index] += 1 | |
| # Compute the average accuracy for each class | |
| average_accuracies = [correct / total if total != 0 else 0 for correct, total in zip(correct_counts, total_counts)] | |
| # Return the overall average accuracy across classes | |
| return sum(average_accuracies) / len(average_accuracies) | |
| # Compute ROC curve and AUC for each class | |
| def roc_auc_per_class(model, generator, num_classes): | |
| all_fpr = [] | |
| all_tpr = [] | |
| all_auc = [] | |
| for images, labels in generator: | |
| predictions = model.predict(images) | |
| # Compute ROC curve and AUC for each class | |
| for i in range(num_classes): | |
| try: | |
| # Check if both positive and negative examples are present | |
| if len(np.unique(labels[:, i])) > 1: | |
| fpr, tpr, _ = roc_curve(labels[:, i], predictions[:, i]) | |
| roc_auc = auc(fpr, tpr) | |
| else: | |
| # If only one class is present, assign arbitrary values | |
| fpr, tpr, roc_auc = [0], [0], 0.5 | |
| all_fpr.append(fpr) | |
| all_tpr.append(tpr) | |
| all_auc.append(roc_auc) | |
| except Exception as e: | |
| print(f"Error in computing ROC for class {i}: {e}") | |
| fpr, tpr, roc_auc = [0], [0], 0.5 # Default values in case of an error | |
| all_fpr.append(fpr) | |
| all_tpr.append(tpr) | |
| all_auc.append(roc_auc) | |
| return all_fpr, all_tpr, all_auc | |
| # Compute average cosine similarity between predicted embeddings and true embeddings | |
| def average_cosine_similarity(model, generator): | |
| cosine_similarities = [] | |
| for images, labels in generator: | |
| predictions = model.predict(images) | |
| # Ensure that labels are one-hot encoded | |
| one_hot_labels = keras.utils.to_categorical(labels, num_classes=model.output_shape[-1]) | |
| # Compute cosine similarity for each pair of prediction and true label | |
| for i in range(predictions.shape[0]): | |
| sim = cosine_similarity([predictions[i]], [one_hot_labels[i]])[0][0] | |
| cosine_similarities.append(sim) | |
| return np.mean(cosine_similarities) | |
| # Then call this function as before | |
| train_avg_acc_per_class = average_accuracy_per_class(model, training_generator, num_classes) | |
| val_avg_acc_per_class = average_accuracy_per_class(model, validation_generator, num_classes) | |
| print("\nAverage Accuracy Per Class (Training): {:.2f}%".format(train_avg_acc_per_class * 100)) | |
| print("Average Accuracy Per Class (Validation): {:.2f}%".format(val_avg_acc_per_class * 100)) | |
| # Assess the performance on the train and validation sets | |
| train_evaluation = model.evaluate(training_generator) | |
| validation_evaluation = model.evaluate(validation_generator) | |
| # Unpack the evaluation metrics for both train and validation | |
| train_loss, train_acc = train_evaluation | |
| val_loss, val_acc = validation_evaluation | |
| # Displaying the performance metrics as percentages | |
| print('\nTraining Metrics:') | |
| print('Loss:', train_loss) | |
| print('Accuracy: {:.2f}%'.format(train_acc * 100)) | |
| print('\nValidation Metrics:') | |
| print('Loss:', val_loss) | |
| print('Accuracy: {:.2f}%'.format(val_acc * 100)) | |
| # Retrieve the history object from model training | |
| training_stats = history.history | |
| # Derive accuracy and loss metrics from the history | |
| train_acc_values = training_stats['accuracy'] | |
| val_acc_values = training_stats['val_accuracy'] | |
| train_loss_values = training_stats['loss'] | |
| val_loss_values = training_stats['val_loss'] | |
| # Visualize the training accuracy and loss | |
| plt.figure(figsize=(12, 4)) | |
| # Accuracy subplot | |
| plt.subplot(1, 2, 1) | |
| plt.plot(train_acc_values, label='Training Accuracy') | |
| plt.plot(val_acc_values, label='Validation Accuracy') | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Accuracy') | |
| plt.legend() | |
| plt.title('Accuracy for Training vs. Validation') | |
| # Loss subplot | |
| plt.subplot(1, 2, 2) | |
| plt.plot(train_loss_values, label='Training Loss') | |
| plt.plot(val_loss_values, label='Validation Loss') | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Loss') | |
| plt.legend() | |
| plt.title('Loss for Training vs. Validation') | |
| plt.tight_layout() | |
| plt.show() | |
| # Compute ROC curves and plot them | |
| train_fpr, train_tpr, train_auc = roc_auc_per_class(model, training_generator, num_classes) | |
| val_fpr, val_tpr, val_auc = roc_auc_per_class(model, validation_generator, num_classes) | |
| import random | |
| def plot_subset_roc_curves(fpr, tpr, auc, num_classes, subset_size=10): | |
| plt.figure(figsize=(8, 6)) | |
| plt.plot([0, 1], [0, 1], 'k--') | |
| selected_classes = random.sample(range(num_classes), subset_size) | |
| for i in selected_classes: | |
| plt.plot(fpr[i], tpr[i], label=f'Class {i} (AUC = {auc[i]:.2f})') | |
| plt.xlabel('False Positive Rate') | |
| plt.ylabel('True Positive Rate') | |
| plt.title('ROC Curve (Subset of Classes)') | |
| plt.legend() | |
| plt.show() | |
| # Plot for Training and Validation Set | |
| plot_subset_roc_curves(train_fpr, train_tpr, train_auc, num_classes=4000, subset_size=10) | |
| plot_subset_roc_curves(val_fpr, val_tpr, val_auc, num_classes=4000, subset_size=10) | |
| # Compute and print average cosine similarity | |
| train_avg_cosine_similarity = average_cosine_similarity(model, training_generator) | |
| val_avg_cosine_similarity = average_cosine_similarity(model, validation_generator) | |
| print("\nAverage Cosine Similarity (Training): {:.4f}".format(train_avg_cosine_similarity)) | |
| print("Average Cosine Similarity (Validation): {:.4f}".format(val_avg_cosine_similarity)) | |
| plt.show() | |
| # Save the model | |
| model.save_weights("final_weights.h5") | |
| model.save("final_model.h5") |