| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from torch.utils.data import Dataset, DataLoader, random_split |
| import pandas as pd |
| import numpy as np |
| import json |
| import os |
| import re |
| import urllib.parse |
| import matplotlib.pyplot as plt |
| from collections import Counter |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import StandardScaler |
| import tqdm |
|
|
| |
|
|
| |
| HEALTHCARE_KEYWORDS = [ |
| 'health', 'medical', 'hospital', 'clinic', 'pharma', 'patient', 'care', 'med', |
| 'doctor', 'physician', 'nurse', 'therapy', 'rehab', 'dental', 'cardio', 'neuro', |
| 'oncology', 'pediatric', 'orthopedic', 'surgery', 'diagnostic', 'wellbeing', |
| 'wellness', 'ehr', 'emr', 'mychart', 'medicare', 'medicaid', 'insurance' |
| ] |
|
|
| |
| HEALTHCARE_INSTITUTIONS = [ |
| 'mayo', 'cleveland', 'hopkins', 'kaiser', 'mount sinai', 'cedars', 'baylor', |
| 'nhs', 'quest', 'labcorp', 'cvs', 'walgreens', 'aetna', 'cigna', 'unitedhealthcare', |
| 'bluecross', 'anthem', 'humana', 'va.gov', 'cdc', 'who', 'nih' |
| ] |
|
|
| |
| HEALTHCARE_DOMAINS = ['.health', '.healthcare', '.medicine', '.hospital', '.clinic', 'mychart.'] |
|
|
| |
|
|
| def url_length(url): |
| """Return the length of the URL.""" |
| return len(url) |
|
|
| def num_dots(url): |
| """Return the number of dots in the URL.""" |
| return url.count('.') |
|
|
| def num_hyphens(url): |
| """Return the number of hyphens in the URL.""" |
| return url.count('-') |
|
|
| def num_at(url): |
| """Return the number of @ symbols in the URL.""" |
| return url.count('@') |
|
|
| def num_tilde(url): |
| """Return the number of ~ symbols in the URL.""" |
| return url.count('~') |
|
|
| def num_underscore(url): |
| """Return the number of underscores in the URL.""" |
| return url.count('_') |
|
|
| def num_percent(url): |
| """Return the number of percent symbols in the URL.""" |
| return url.count('%') |
|
|
| def num_ampersand(url): |
| """Return the number of ampersands in the URL.""" |
| return url.count('&') |
|
|
| def num_hash(url): |
| """Return the number of hash symbols in the URL.""" |
| return url.count('#') |
|
|
| def has_https(url): |
| """Return 1 if the URL uses HTTPS, 0 otherwise.""" |
| return int(url.startswith('https://')) |
|
|
| def has_ip_address(url): |
| """Check if the URL contains an IP address instead of a domain name.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', parsed_url.netloc): |
| return 1 |
| |
| if re.match(r'^\[[0-9a-fA-F:]+\]$', parsed_url.netloc): |
| return 1 |
| return 0 |
| except: |
| return 0 |
|
|
| def get_hostname_length(url): |
| """Return the length of the hostname.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| return len(parsed_url.netloc) |
| except: |
| return 0 |
|
|
| def get_path_length(url): |
| """Return the length of the path.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| return len(parsed_url.path) |
| except: |
| return 0 |
|
|
| def get_path_level(url): |
| """Return the number of directories in the path.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| return parsed_url.path.count('/') |
| except: |
| return 0 |
|
|
| def get_subdomain_level(url): |
| """Return the number of subdomains in the URL.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| hostname = parsed_url.netloc |
| if has_ip_address(url): |
| return 0 |
| |
| parts = hostname.split('.') |
| |
| if len(parts) > 2: |
| return len(parts) - 2 |
| else: |
| return 0 |
| except: |
| return 0 |
|
|
| def has_double_slash_in_path(url): |
| """Check if the path contains a double slash.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| return int('//' in parsed_url.path) |
| except: |
| return 0 |
|
|
| def get_tld(url): |
| """Extract the top-level domain from a URL.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| hostname = parsed_url.netloc.lower() |
| parts = hostname.split('.') |
| if len(parts) > 1: |
| return parts[-1] |
| return '' |
| except: |
| return '' |
|
|
| def count_digits(url): |
| """Count the number of digits in the URL.""" |
| return sum(c.isdigit() for c in url) |
|
|
| def digit_ratio(url): |
| """Calculate the ratio of digits to the total URL length.""" |
| if len(url) == 0: |
| return 0 |
| return count_digits(url) / len(url) |
|
|
| def count_letters(url): |
| """Count the number of letters in the URL.""" |
| return sum(c.isalpha() for c in url) |
|
|
| def letter_ratio(url): |
| """Calculate the ratio of letters to the total URL length.""" |
| if len(url) == 0: |
| return 0 |
| return count_letters(url) / len(url) |
|
|
| def count_special_chars(url): |
| """Count the number of special characters in the URL.""" |
| return sum(not c.isalnum() and not c.isspace() for c in url) |
|
|
| def special_char_ratio(url): |
| """Calculate the ratio of special characters to the total URL length.""" |
| if len(url) == 0: |
| return 0 |
| return count_special_chars(url) / len(url) |
|
|
| def get_query_length(url): |
| """Return the length of the query string.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| return len(parsed_url.query) |
| except: |
| return 0 |
|
|
| def get_fragment_length(url): |
| """Return the length of the fragment.""" |
| try: |
| parsed_url = urllib.parse.urlparse(url) |
| return len(parsed_url.fragment) |
| except: |
| return 0 |
|
|
| def healthcare_relevance_score(url): |
| """ |
| Calculate a relevance score for healthcare-related URLs. |
| Higher scores indicate stronger relation to healthcare. |
| """ |
| url_lower = url.lower() |
| parsed_url = urllib.parse.urlparse(url_lower) |
| domain = parsed_url.netloc |
| path = parsed_url.path |
| |
| score = 0 |
| |
| |
| for keyword in HEALTHCARE_KEYWORDS: |
| if keyword in domain: |
| score += 3 |
| elif keyword in path: |
| score += 1 |
| |
| |
| for institution in HEALTHCARE_INSTITUTIONS: |
| if institution in domain: |
| score += 4 |
| elif institution in path: |
| score += 2 |
| |
| |
| for healthcare_domain in HEALTHCARE_DOMAINS: |
| if healthcare_domain in domain: |
| score += 3 |
| |
| |
| if 'portal' in domain or 'portal' in path: |
| score += 2 |
| if 'patient' in domain or 'mychart' in domain: |
| score += 3 |
| if 'ehr' in domain or 'emr' in domain: |
| score += 3 |
| |
| |
| return min(score / 10.0, 1.0) |
|
|
| def extract_features(url): |
| """Extract all features from a given URL.""" |
| features = [ |
| |
| num_dots(url), |
| get_subdomain_level(url), |
| get_path_level(url), |
| url_length(url), |
| num_hyphens(url), |
| num_at(url), |
| num_tilde(url), |
| num_underscore(url), |
| num_percent(url), |
| num_ampersand(url), |
| num_hash(url), |
| has_https(url), |
| has_ip_address(url), |
| get_hostname_length(url), |
| get_path_length(url), |
| has_double_slash_in_path(url), |
| |
| |
| digit_ratio(url), |
| letter_ratio(url), |
| special_char_ratio(url), |
| get_query_length(url), |
| get_fragment_length(url), |
| healthcare_relevance_score(url) |
| ] |
| return features |
|
|
| def get_feature_names(): |
| """Get names of all features in the order they are extracted.""" |
| return [ |
| 'num_dots', 'subdomain_level', 'path_level', 'url_length', |
| 'num_hyphens', 'num_at', 'num_tilde', 'num_underscore', |
| 'num_percent', 'num_ampersand', 'num_hash', 'has_https', |
| 'has_ip_address', 'hostname_length', 'path_length', |
| 'double_slash_in_path', 'digit_ratio', 'letter_ratio', |
| 'special_char_ratio', 'query_length', 'fragment_length', |
| 'healthcare_relevance' |
| ] |
|
|
| |
|
|
| class URLDataset(Dataset): |
| def __init__(self, features, labels): |
| """ |
| Custom PyTorch Dataset for URL features and labels. |
| |
| Args: |
| features (numpy.ndarray): Feature vectors for each URL |
| labels (numpy.ndarray): Labels for each URL (0 for benign, 1 for malicious) |
| """ |
| self.features = torch.tensor(features, dtype=torch.float32) |
| self.labels = torch.tensor(labels, dtype=torch.long) |
| |
| def __len__(self): |
| return len(self.labels) |
| |
| def __getitem__(self, idx): |
| return self.features[idx], self.labels[idx] |
|
|
| def load_huggingface_data(file_path): |
| """ |
| Load the Hugging Face dataset from a JSON file. |
| |
| Args: |
| file_path: Path to the JSON file |
| |
| Returns: |
| List of tuples containing (url, label) |
| """ |
| with open(file_path, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| url_data = [] |
| for item in data: |
| url = item.get('text', '') |
| label = item.get('label', -1) |
| if url and label != -1: |
| url_data.append((url, label)) |
| |
| print(f"Loaded {len(url_data)} URLs from Hugging Face dataset") |
| return url_data |
|
|
| def load_phiusiil_data(file_path): |
| """ |
| Load the PhiUSIIL dataset from a CSV file. |
| |
| Args: |
| file_path: Path to the CSV file |
| |
| Returns: |
| List of tuples containing (url, label) |
| """ |
| df = pd.read_csv(file_path) |
| |
| url_data = [] |
| for _, row in df.iterrows(): |
| url = row['URL'] |
| label = row['label'] |
| if isinstance(url, str) and url.strip() and not pd.isna(label): |
| url_data.append((url, label)) |
| |
| print(f"Loaded {len(url_data)} URLs from PhiUSIIL dataset") |
| return url_data |
|
|
| def load_kaggle_data(file_path): |
| """ |
| Load the Kaggle malicious_phish.csv dataset. |
| |
| Args: |
| file_path: Path to the CSV file |
| |
| Returns: |
| List of tuples containing (url, label) |
| """ |
| df = pd.read_csv(file_path) |
| |
| url_data = [] |
| for _, row in df.iterrows(): |
| url = row['url'] |
| type_val = row['type'] |
| |
| |
| label = 0 if type_val.lower() == 'benign' else 1 |
| |
| if isinstance(url, str) and url.strip(): |
| url_data.append((url, label)) |
| |
| print(f"Loaded {len(url_data)} URLs from Kaggle dataset") |
| return url_data |
|
|
| def combine_and_deduplicate(datasets): |
| """ |
| Combine multiple datasets and remove duplicates by URL. |
| |
| Args: |
| datasets: List of datasets, each containing (url, label) tuples |
| |
| Returns: |
| Tuple of (urls, labels) with duplicates removed |
| """ |
| url_to_label = {} |
| |
| |
| for dataset in datasets: |
| for url, label in dataset: |
| |
| |
| if url in url_to_label: |
| url_to_label[url] = max(url_to_label[url], label) |
| else: |
| url_to_label[url] = label |
| |
| |
| urls = list(url_to_label.keys()) |
| labels = list(url_to_label.values()) |
| |
| print(f"After deduplication: {len(urls)} unique URLs") |
| |
| |
| label_counts = Counter(labels) |
| print(f"Class distribution - Benign (0): {label_counts[0]}, Malicious (1): {label_counts[1]}") |
| |
| return urls, labels |
|
|
| def extract_all_features(urls): |
| """ |
| Extract features from a list of URLs. |
| |
| Args: |
| urls: List of URL strings |
| |
| Returns: |
| Numpy array of feature vectors |
| """ |
| feature_vectors = [] |
| |
| |
| for url in tqdm.tqdm(urls, desc="Extracting features"): |
| try: |
| features = extract_features(url) |
| feature_vectors.append(features) |
| except Exception as e: |
| print(f"Error extracting features from {url}: {str(e)}") |
| |
| feature_vectors.append([0] * len(get_feature_names())) |
| |
| return np.array(feature_vectors, dtype=np.float32) |
|
|
| |
| class PhishingMLP(nn.Module): |
| def __init__(self, input_size=22, hidden_sizes=[22, 30, 10], output_size=1): |
| """ |
| Multilayer Perceptron for Phishing URL Detection. |
| |
| Args: |
| input_size: Number of input features (default: 22) |
| hidden_sizes: List of neurons in each hidden layer |
| output_size: Number of output classes (1 for binary) |
| """ |
| super(PhishingMLP, self).__init__() |
| |
| self.layers = nn.ModuleList() |
| |
| |
| self.layers.append(nn.Linear(input_size, hidden_sizes[0])) |
| self.layers.append(nn.ReLU()) |
| |
| |
| for i in range(len(hidden_sizes) - 1): |
| self.layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1])) |
| self.layers.append(nn.ReLU()) |
| |
| |
| self.layers.append(nn.Linear(hidden_sizes[-1], output_size)) |
| self.layers.append(nn.Sigmoid()) |
|
|
| def forward(self, x): |
| """Forward pass through the network.""" |
| for layer in self.layers: |
| x = layer(x) |
| return x |
|
|
| |
| def train_mlp(model, train_loader, val_loader, epochs=25, learning_rate=0.001, device="cpu"): |
| """ |
| Train the MLP model. |
| |
| Args: |
| model: The MLP model |
| train_loader: DataLoader for training data |
| val_loader: DataLoader for validation data |
| epochs: Number of training epochs |
| learning_rate: Learning rate for optimization |
| device: Device to train on (cpu or cuda) |
| |
| Returns: |
| Tuple of (trained_model, train_losses, val_losses, val_accuracies) |
| """ |
| model.to(device) |
| criterion = nn.BCELoss() |
| optimizer = optim.Adam(model.parameters(), lr=learning_rate) |
| |
| train_losses = [] |
| val_losses = [] |
| val_accuracies = [] |
| |
| print(f"Training on {device}...") |
| for epoch in range(epochs): |
| |
| model.train() |
| running_loss = 0.0 |
| |
| for inputs, labels in train_loader: |
| inputs, labels = inputs.to(device), labels.to(device) |
| |
| |
| optimizer.zero_grad() |
| |
| |
| outputs = model(inputs) |
| loss = criterion(outputs, labels.unsqueeze(1).float()) |
| loss.backward() |
| optimizer.step() |
| |
| running_loss += loss.item() |
| |
| |
| epoch_train_loss = running_loss / len(train_loader) |
| train_losses.append(epoch_train_loss) |
| |
| |
| model.eval() |
| val_loss = 0.0 |
| correct = 0 |
| total = 0 |
| |
| with torch.no_grad(): |
| for inputs, labels in val_loader: |
| inputs, labels = inputs.to(device), labels.to(device) |
| outputs = model(inputs) |
| |
| |
| loss = criterion(outputs, labels.unsqueeze(1).float()) |
| val_loss += loss.item() |
| |
| |
| predicted = (outputs > 0.5).float() |
| total += labels.size(0) |
| correct += (predicted.squeeze() == labels.float()).sum().item() |
| |
| |
| epoch_val_loss = val_loss / len(val_loader) |
| val_losses.append(epoch_val_loss) |
| |
| val_accuracy = 100 * correct / total |
| val_accuracies.append(val_accuracy) |
| |
| |
| print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_train_loss:.4f}, Val Loss: {epoch_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%") |
| |
| return model, train_losses, val_losses, val_accuracies |
|
|
| def evaluate_model(model, test_loader, device): |
| """ |
| Evaluate the trained model on test data. |
| |
| Args: |
| model: Trained model |
| test_loader: DataLoader for test data |
| device: Device to evaluate on |
| |
| Returns: |
| Tuple of (accuracy, precision, recall, f1_score) |
| """ |
| model.to(device) |
| model.eval() |
| |
| correct = 0 |
| total = 0 |
| true_positives = 0 |
| false_positives = 0 |
| false_negatives = 0 |
| healthcare_correct = 0 |
| healthcare_total = 0 |
| |
| feature_idx = get_feature_names().index('healthcare_relevance') |
| healthcare_threshold = 0.5 |
| |
| with torch.no_grad(): |
| for inputs, labels in test_loader: |
| inputs, labels = inputs.to(device), labels.to(device) |
| |
| |
| outputs = model(inputs) |
| predicted = (outputs > 0.5).float().squeeze() |
| |
| |
| total += labels.size(0) |
| correct += (predicted == labels.float()).sum().item() |
| |
| |
| for i in range(labels.size(0)): |
| if labels[i] == 1 and predicted[i] == 1: |
| true_positives += 1 |
| elif labels[i] == 0 and predicted[i] == 1: |
| false_positives += 1 |
| elif labels[i] == 1 and predicted[i] == 0: |
| false_negatives += 1 |
| |
| |
| if inputs[i, feature_idx] >= healthcare_threshold: |
| healthcare_total += 1 |
| if predicted[i] == labels[i]: |
| healthcare_correct += 1 |
| |
| |
| accuracy = 100 * correct / total |
| precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0.0 |
| recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0.0 |
| f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0 |
| |
| |
| healthcare_accuracy = 100 * healthcare_correct / healthcare_total if healthcare_total > 0 else 0.0 |
| |
| print(f"Overall Test Accuracy: {accuracy:.2f}%") |
| print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1-Score: {f1:.4f}") |
| print(f"Healthcare URLs identified: {healthcare_total} ({healthcare_total/total*100:.2f}%)") |
| print(f"Healthcare URL Detection Accuracy: {healthcare_accuracy:.2f}%") |
| |
| return accuracy, precision, recall, f1, healthcare_accuracy |
|
|
| def plot_training_results(train_losses, val_losses, val_accuracies): |
| """ |
| Plot training metrics. |
| |
| Args: |
| train_losses: List of training losses |
| val_losses: List of validation losses |
| val_accuracies: List of validation accuracies |
| """ |
| plt.figure(figsize=(15, 5)) |
| |
| |
| plt.subplot(1, 2, 1) |
| plt.plot(train_losses, label='Training Loss') |
| plt.plot(val_losses, label='Validation Loss') |
| plt.xlabel('Epoch') |
| plt.ylabel('Loss') |
| plt.title('Training and Validation Loss') |
| plt.legend() |
| |
| |
| plt.subplot(1, 2, 2) |
| plt.plot(val_accuracies, label='Validation Accuracy') |
| plt.xlabel('Epoch') |
| plt.ylabel('Accuracy (%)') |
| plt.title('Validation Accuracy') |
| plt.legend() |
| |
| plt.tight_layout() |
| plt.savefig('training_results.png') |
| plt.show() |
|
|
| def analyze_healthcare_features(features, labels, pred_labels): |
| """ |
| Analyze how the model performs on healthcare-related URLs. |
| |
| Args: |
| features: Feature vectors |
| labels: True labels |
| pred_labels: Predicted labels |
| """ |
| healthcare_idx = get_feature_names().index('healthcare_relevance') |
| healthcare_scores = features[:, healthcare_idx] |
| |
| |
| thresholds = [0.1, 0.3, 0.5, 0.7, 0.9] |
| |
| print("\n=== Healthcare URL Analysis ===") |
| print("Healthcare relevance score distribution:") |
| for threshold in thresholds: |
| count = np.sum(healthcare_scores >= threshold) |
| percent = (count / len(healthcare_scores)) * 100 |
| print(f" Score >= {threshold}: {count} URLs ({percent:.2f}%)") |
|
|
| |
| for threshold in thresholds: |
| mask = healthcare_scores >= threshold |
| if np.sum(mask) == 0: |
| continue |
| |
| h_labels = labels[mask] |
| h_preds = pred_labels[mask] |
| h_accuracy = np.mean(h_labels == h_preds) * 100 |
| |
| benign_count = np.sum(h_labels == 0) |
| malicious_count = np.sum(h_labels == 1) |
| |
| print(f"\nFor healthcare relevance >= {threshold}:") |
| print(f" URLs: {np.sum(mask)} ({benign_count} benign, {malicious_count} malicious)") |
| print(f" Accuracy: {h_accuracy:.2f}%") |
| |
| |
| tp = np.sum((h_labels == 1) & (h_preds == 1)) |
| fp = np.sum((h_labels == 0) & (h_preds == 1)) |
| fn = np.sum((h_labels == 1) & (h_preds == 0)) |
| |
| precision = tp / (tp + fp) if (tp + fp) > 0 else 0 |
| recall = tp / (tp + fn) if (tp + fn) > 0 else 0 |
| f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 |
| |
| print(f" Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}") |
| |
| |
| if benign_count > 0: |
| h_fpr = np.sum((h_labels == 0) & (h_preds == 1)) / benign_count |
| print(f" False Positive Rate: {h_fpr:.4f}") |
|
|
| |
| if malicious_count > 0: |
| h_fnr = np.sum((h_labels == 1) & (h_preds == 0)) / malicious_count |
| print(f" False Negative Rate: {h_fnr:.4f}") |
|
|
| |
| def main(): |
| """Main function to run the entire pipeline.""" |
| |
| batch_size = 32 |
| learning_rate = 0.001 |
| epochs = 20 |
| test_size = 0.2 |
| val_size = 0.2 |
| random_seed = 42 |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
| |
| huggingface_file = "urls.json" |
| phiusiil_file = "PhiUSIIL_Phishing_URL_Dataset.csv" |
| kaggle_file = "malicious_phish.csv" |
| |
| |
| print("Loading datasets...") |
| huggingface_data = load_huggingface_data(huggingface_file) |
| phiusiil_data = load_phiusiil_data(phiusiil_file) |
| kaggle_data = load_kaggle_data(kaggle_file) |
| |
| |
| print("Combining and deduplicating datasets...") |
| urls, labels = combine_and_deduplicate([huggingface_data, phiusiil_data, kaggle_data]) |
| |
| |
| print("Extracting features...") |
| features = extract_all_features(urls) |
| |
| |
| print("Splitting data...") |
| X_train_val, X_test, y_train_val, y_test = train_test_split( |
| features, labels, test_size=test_size, random_state=random_seed, stratify=labels |
| ) |
| |
| X_train, X_val, y_train, y_val = train_test_split( |
| X_train_val, y_train_val, test_size=val_size/(1-test_size), |
| random_state=random_seed, stratify=y_train_val |
| ) |
| |
| |
| print("Standardizing features...") |
| scaler = StandardScaler() |
| X_train = scaler.fit_transform(X_train) |
| X_val = scaler.transform(X_val) |
| X_test = scaler.transform(X_test) |
| |
| |
| print("Creating DataLoaders...") |
| train_dataset = URLDataset(X_train, y_train) |
| val_dataset = URLDataset(X_val, y_val) |
| test_dataset = URLDataset(X_test, y_test) |
| |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
| val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) |
| test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) |
| |
| |
| print("Initializing model...") |
| input_size = features.shape[1] |
| model = PhishingMLP(input_size=input_size) |
| |
| print("Training model...") |
| trained_model, train_losses, val_losses, val_accuracies = train_mlp( |
| model, train_loader, val_loader, epochs=epochs, |
| learning_rate=learning_rate, device=device |
| ) |
| |
| |
| print("Saving model...") |
| model_path = "phishing_mlp_model.pth" |
| torch.save(trained_model.state_dict(), model_path) |
| print(f"Model saved to {model_path}") |
| |
| |
| print("\nEvaluating model on test set...") |
| acc, prec, rec, f1, healthcare_acc = evaluate_model(trained_model, test_loader, device) |
| |
| |
| plot_training_results(train_losses, val_losses, val_accuracies) |
| |
| |
| y_pred = [] |
| trained_model.eval() |
| with torch.no_grad(): |
| for inputs, _ in test_loader: |
| inputs = inputs.to(device) |
| outputs = trained_model(inputs) |
| predicted = (outputs > 0.5).float().squeeze().cpu().numpy() |
| y_pred.extend(predicted.tolist()) |
| |
| analyze_healthcare_features(X_test, np.array(y_test), np.array(y_pred)) |
| |
| |
| feature_names = get_feature_names() |
| healthcare_idx = feature_names.index('healthcare_relevance') |
| healthcare_scores = features[:, healthcare_idx] |
| high_healthcare = healthcare_scores >= 0.5 |
| |
| print("\n=== Healthcare URL Examples ===") |
| high_healthcare_indices = np.where(high_healthcare)[0][:5] |
| for idx in high_healthcare_indices: |
| print(f"URL: {urls[idx]}") |
| print(f"Healthcare Score: {healthcare_scores[idx]:.2f}") |
| print(f"Label: {'Malicious' if labels[idx] == 1 else 'Benign'}") |
| print() |
| |
| |
| print("\n=== Summary ===") |
| print(f"Total URLs processed: {len(urls)}") |
| print(f"Training set: {len(X_train)} URLs") |
| print(f"Validation set: {len(X_val)} URLs") |
| print(f"Test set: {len(X_test)} URLs") |
| print(f"Model input features: {input_size}") |
| print(f"Test Accuracy: {acc:.2f}%") |
| print(f"Healthcare URL Accuracy: {healthcare_acc:.2f}%") |
| print(f"Precision: {prec:.4f}, Recall: {rec:.4f}, F1-Score: {f1:.4f}") |
| print("\nTraining complete!") |
|
|
| if __name__ == "__main__": |
| main() |