AmlProject / SupervisedLearningCNN.py
Munaamullah's picture
Upload 10 files
c2bcda5
# -*- coding: utf-8 -*-
"""Untitled8.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1nXE8zcbIHJ-cVLamX0RVlAOgQOrUmFu-
"""
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.applications.resnet_v2 import preprocess_input
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Conv2D, MaxPool2D, Flatten, MaxPooling2D, PReLU, Dense, Dropout, BatchNormalization,
GlobalAveragePooling2D, GaussianNoise)
from tensorflow.keras.layers.experimental.preprocessing import RandomFlip, RandomRotation,RandomTranslation, RandomZoom, RandomContrast, RandomHeight, RandomWidth
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
import cv2
import matplotlib.pyplot as plt
# Constants
BASE_PATH = "C:/Users/101231186/Desktop/AML"
TRAIN_PATH = os.path.join(BASE_PATH, "train_data")
TEST_PATH = os.path.join(BASE_PATH, "test_data")
VAL_PATH = os.path.join(BASE_PATH, "val_data")
num_classes = 4000
def count_directories(path):
return len([name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))])
def create_labels_file(dataset_path, file_name):
image_paths, image_labels = [], []
class_dirs = sorted([d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))])
for class_label, class_dir in enumerate(class_dirs):
class_dir_path = os.path.join(dataset_path, class_dir)
image_files = os.listdir(class_dir_path)
for image_file in image_files:
image_file_path = os.path.join(class_dir_path, image_file)
if os.path.isfile(image_file_path):
image_paths.append(image_file_path)
image_labels.append(class_label)
with open(file_name, 'w') as f:
for path, label in zip(image_paths, image_labels):
f.write(f"{path} {label}\n")
def load_labels(file_path, base_img_path):
list_IDs, labels = [], {}
with open(file_path, 'r') as file:
for line in file:
relative_image_path, label = line.strip().rsplit(' ', 1)
full_image_path = os.path.join(base_img_path, relative_image_path)
list_IDs.append(full_image_path)
labels[full_image_path] = int(label)
return list_IDs, labels
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, batch_size=32, dim=(224,224), n_channels=3,
n_classes=4000, shuffle=True):
'Initialization'
self.dim = dim
self.batch_size = batch_size
self.labels = labels
self.list_IDs = list_IDs
self.n_channels = n_channels
self.n_classes = n_classes
self.shuffle = shuffle
self.on_epoch_end()
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
X, y = self.__data_generation(list_IDs_temp)
return X, y
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle:
np.random.shuffle(self.indexes)
def __data_generation(self, list_IDs_temp):
'Generates data containing batch_size samples'
# Initialization
X = np.empty((self.batch_size, *self.dim, self.n_channels))
y = np.empty((self.batch_size), dtype=int)
# Generate data
for i, ID in enumerate(list_IDs_temp):
# Load and preprocess the image
img = self.load_and_preprocess_image(ID, self.dim)
X[i,] = img
# Store class
y[i] = self.labels[ID]
return X, keras.utils.to_categorical(y, num_classes=self.n_classes)
def load_and_preprocess_image(self, image_path, target_size):
# Load the image file
image = cv2.imread(image_path)
image = cv2.resize(image, target_size) # Resize image
# No need to convert to RGB as preprocess_input will handle it
image = preprocess_input(image) # Use ResNet50V2's preprocess_input
return image
# Count directories
train_classes_count = count_directories(TRAIN_PATH)
test_classes_count = count_directories(TEST_PATH)
val_classes_count = count_directories(VAL_PATH)
print('Total training classes:', train_classes_count)
print('Total testing classes:', test_classes_count)
print('Total validation classes:', val_classes_count)
# Create labels files
create_labels_file(TRAIN_PATH, 'labels_train.txt')
create_labels_file(TEST_PATH, 'labels_test.txt')
create_labels_file(VAL_PATH, 'labels_val.txt')
# Load labels and initialize generators
train_list_IDs, train_labels = load_labels('labels_train.txt', TRAIN_PATH)
test_list_IDs, test_labels = load_labels('labels_test.txt', TEST_PATH)
val_list_IDs, val_labels = load_labels('labels_val.txt', VAL_PATH)
training_generator = DataGenerator(train_list_IDs, train_labels)
testing_generator = DataGenerator(test_list_IDs, test_labels)
validation_generator = DataGenerator(val_list_IDs, val_labels)
# Load pre-trained ResNet50V2 model (excluding the top layer)
base_model = ResNet50V2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
# Freeze the layers of the pre-trained model
for layer in base_model.layers:
layer.trainable = False
# Unfreezing some layers of the base model for fine-tuning
for layer in base_model.layers[-30:]: # Unfreeze last 30 layers
layer.trainable = True
# Data Augmentation
data_augmentation = Sequential([
RandomFlip('horizontal_and_vertical'),
RandomRotation(0.1),
RandomZoom(0.1),
RandomTranslation(height_factor=0.1, width_factor=0.1),
RandomHeight(0.1),
RandomWidth(0.1)
])
# Unfreezing some layers of the base model for fine-tuning
for layer in base_model.layers[-30:]: # Unfreeze last 30 layers
layer.trainable = True
# Define the new top layers for embedding
embedding_layer = tf.keras.Sequential([
tf.keras.layers.Dropout(0.5),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(200, activation='relu'),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(num_classes, activation='softmax')
])
# Define the model
model = tf.keras.Sequential([
base_model,
data_augmentation,
embedding_layer,
Dense(num_classes, activation='softmax') # This layer is for training purposes only
])
# Learning Rate Scheduling
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=1e-3,
decay_steps=1000,
decay_rate=0.9)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
# Callbacks
checkpoint = ModelCheckpoint('best_model.h5', monitor='val_accuracy', save_best_only=True)
def scheduler(epoch, lr):
if epoch < 10:
return lr
else:
return lr * tf.math.exp(-0.1)
lr_scheduler = LearningRateScheduler(scheduler)
# Define metrics
top1 = tf.keras.metrics.TopKCategoricalAccuracy(k=1)
# Compile the model with a different optimizer or learning rate
model.compile(optimizer=Adam(learning_rate=1e-4), # Adjusted learning rate
loss='categorical_crossentropy',
metrics=['accuracy'])
# Train the model with callbacks and class weights if necessary
class_weights = {i: 1.0 for i in range(num_classes)} # Modify these weights if needed
history = model.fit(training_generator,
epochs=40,
validation_data=validation_generator,
callbacks=[EarlyStopping(verbose=1, patience=5), checkpoint, lr_scheduler],
class_weight=class_weights)
# Evaluate the model
evaluation_results = model.evaluate(validation_generator)
print(f"Validation Loss: {evaluation_results[0]}, Validation Accuracy: {evaluation_results[1]}")
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from sklearn.metrics.pairwise import cosine_similarity
# After training and evaluation, compute the average accuracy per class
def average_accuracy_per_class(model, generator, num_classes):
# Initialize a list to store correct counts for each class
correct_counts = [0] * num_classes
total_counts = [0] * num_classes
# Loop through the batches in the generator
for images, labels in generator:
predictions = model.predict(images)
predicted_classes = tf.argmax(predictions, axis=1)
true_classes = tf.argmax(labels, axis=1)
# Update the correct count and total count for each class
for i in range(len(true_classes)):
true_class_index = true_classes[i]
total_counts[true_class_index] += 1
if predicted_classes[i] == true_class_index:
correct_counts[true_class_index] += 1
# Compute the average accuracy for each class
average_accuracies = [correct / total if total != 0 else 0 for correct, total in zip(correct_counts, total_counts)]
# Return the overall average accuracy across classes
return sum(average_accuracies) / len(average_accuracies)
# Compute ROC curve and AUC for each class
def roc_auc_per_class(model, generator, num_classes):
all_fpr = []
all_tpr = []
all_auc = []
for images, labels in generator:
predictions = model.predict(images)
# Compute ROC curve and AUC for each class
for i in range(num_classes):
try:
# Check if both positive and negative examples are present
if len(np.unique(labels[:, i])) > 1:
fpr, tpr, _ = roc_curve(labels[:, i], predictions[:, i])
roc_auc = auc(fpr, tpr)
else:
# If only one class is present, assign arbitrary values
fpr, tpr, roc_auc = [0], [0], 0.5
all_fpr.append(fpr)
all_tpr.append(tpr)
all_auc.append(roc_auc)
except Exception as e:
print(f"Error in computing ROC for class {i}: {e}")
fpr, tpr, roc_auc = [0], [0], 0.5 # Default values in case of an error
all_fpr.append(fpr)
all_tpr.append(tpr)
all_auc.append(roc_auc)
return all_fpr, all_tpr, all_auc
# Compute average cosine similarity between predicted embeddings and true embeddings
def average_cosine_similarity(model, generator):
cosine_similarities = []
for images, labels in generator:
predictions = model.predict(images)
# Ensure that labels are one-hot encoded
one_hot_labels = keras.utils.to_categorical(labels, num_classes=model.output_shape[-1])
# Compute cosine similarity for each pair of prediction and true label
for i in range(predictions.shape[0]):
sim = cosine_similarity([predictions[i]], [one_hot_labels[i]])[0][0]
cosine_similarities.append(sim)
return np.mean(cosine_similarities)
# Then call this function as before
train_avg_acc_per_class = average_accuracy_per_class(model, training_generator, num_classes)
val_avg_acc_per_class = average_accuracy_per_class(model, validation_generator, num_classes)
print("\nAverage Accuracy Per Class (Training): {:.2f}%".format(train_avg_acc_per_class * 100))
print("Average Accuracy Per Class (Validation): {:.2f}%".format(val_avg_acc_per_class * 100))
# Assess the performance on the train and validation sets
train_evaluation = model.evaluate(training_generator)
validation_evaluation = model.evaluate(validation_generator)
# Unpack the evaluation metrics for both train and validation
train_loss, train_acc = train_evaluation
val_loss, val_acc = validation_evaluation
# Displaying the performance metrics as percentages
print('\nTraining Metrics:')
print('Loss:', train_loss)
print('Accuracy: {:.2f}%'.format(train_acc * 100))
print('\nValidation Metrics:')
print('Loss:', val_loss)
print('Accuracy: {:.2f}%'.format(val_acc * 100))
# Retrieve the history object from model training
training_stats = history.history
# Derive accuracy and loss metrics from the history
train_acc_values = training_stats['accuracy']
val_acc_values = training_stats['val_accuracy']
train_loss_values = training_stats['loss']
val_loss_values = training_stats['val_loss']
# Visualize the training accuracy and loss
plt.figure(figsize=(12, 4))
# Accuracy subplot
plt.subplot(1, 2, 1)
plt.plot(train_acc_values, label='Training Accuracy')
plt.plot(val_acc_values, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy for Training vs. Validation')
# Loss subplot
plt.subplot(1, 2, 2)
plt.plot(train_loss_values, label='Training Loss')
plt.plot(val_loss_values, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss for Training vs. Validation')
plt.tight_layout()
plt.show()
# Compute ROC curves and plot them
train_fpr, train_tpr, train_auc = roc_auc_per_class(model, training_generator, num_classes)
val_fpr, val_tpr, val_auc = roc_auc_per_class(model, validation_generator, num_classes)
import random
def plot_subset_roc_curves(fpr, tpr, auc, num_classes, subset_size=10):
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'k--')
selected_classes = random.sample(range(num_classes), subset_size)
for i in selected_classes:
plt.plot(fpr[i], tpr[i], label=f'Class {i} (AUC = {auc[i]:.2f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve (Subset of Classes)')
plt.legend()
plt.show()
# Plot for Training and Validation Set
plot_subset_roc_curves(train_fpr, train_tpr, train_auc, num_classes=4000, subset_size=10)
plot_subset_roc_curves(val_fpr, val_tpr, val_auc, num_classes=4000, subset_size=10)
# Compute and print average cosine similarity
train_avg_cosine_similarity = average_cosine_similarity(model, training_generator)
val_avg_cosine_similarity = average_cosine_similarity(model, validation_generator)
print("\nAverage Cosine Similarity (Training): {:.4f}".format(train_avg_cosine_similarity))
print("Average Cosine Similarity (Validation): {:.4f}".format(val_avg_cosine_similarity))
plt.show()
# Save the model
model.save_weights("final_weights.h5")
model.save("final_model.h5")