Skip to content
Snippets Groups Projects
Commit 6bedb4fb authored by AjUm-HEIDI's avatar AjUm-HEIDI
Browse files

Add the updated files related to GNN

parent 3fe3a1fd
No related branches found
No related tags found
No related merge requests found
import torch
import torch.nn.functional as F
from torch.nn import Module, Linear, BatchNorm1d, Dropout
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch_geometric.nn import HeteroConv, GraphConv
from torch_geometric.data import HeteroData
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score, precision_score, recall_score
import numpy as np
from typing import Dict, Optional, List, Tuple
from torch_geometric.loader import DataLoader
from tabulate import tabulate
from ConceptLearner.Utils import get_feature_sizes_and_edge_config, find_classes_with_y_labels
class GNN(Module):
def __init__(self, data: HeteroData, hidden_channels: int = 256,
num_hidden_layers: int = 4) -> None:
"""Initialize the Heterogeneous Graph Neural Network.
Args:
data (HeteroData): The heterogeneous graph data
hidden_channels (int, optional): Number of hidden channels. Defaults to 256.
num_hidden_layers (int, optional): Number of hidden GNN layers. Defaults to 4.
"""
super(GNN, self).__init__()
# Get feature sizes and edge configuration
self.feature_sizes, self.edge_config = get_feature_sizes_and_edge_config(data)
self.num_hidden_layers = num_hidden_layers
self.data = data
# Initial normalization layers for each node type
self.norm_layers = torch.nn.ModuleDict({
node_type: BatchNorm1d(feature_size)
for node_type, feature_size in self.feature_sizes.items()
})
# Separate conv layers and normalization for heterogeneous graphs
self.convs = torch.nn.ModuleList()
self.bns = torch.nn.ModuleList()
# Create multiple conv layers with improved structure
for layer in range(num_hidden_layers):
conv_dict = {}
for (src, rel, dst) in self.edge_config:
in_channels = ((self.feature_sizes[src] if layer == 0 else hidden_channels),
(self.feature_sizes[dst] if layer == 0 else hidden_channels))
conv_dict[(src, rel, dst)] = GraphConv(
in_channels=in_channels,
out_channels=hidden_channels
)
self.convs.append(HeteroConv(conv_dict, aggr='mean'))
# Add batch norm for each node type
self.bns.append(torch.nn.ModuleDict({
node_type: BatchNorm1d(hidden_channels)
for node_type in self.feature_sizes
}))
# Multi-head pooling for each node type
self.pooling = torch.nn.ModuleDict({
node_type: torch.nn.ModuleDict({
'mean': Linear(hidden_channels, hidden_channels),
'add': Linear(hidden_channels, hidden_channels)
}) for node_type in self.feature_sizes
})
# Find nodes with labels
self.label_nodes = find_classes_with_y_labels(self.data, first_only=False)
# Classification heads for each labeled node type
self.classifiers = torch.nn.ModuleDict()
for node_type in self.label_nodes:
num_classes = len(torch.unique(data[node_type].y))
self.classifiers[node_type] = torch.nn.Sequential(
Linear(hidden_channels * 2, hidden_channels),
torch.nn.ELU(),
Dropout(p=0.2),
Linear(hidden_channels, hidden_channels // 2),
torch.nn.ELU(),
Dropout(p=0.2),
Linear(hidden_channels // 2, num_classes)
)
self.dropout = Dropout(p=0.2)
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.to(self.device)
self.best_model_state = None
def forward(self, x_dict: Dict[str, torch.Tensor],
edge_index_dict: Dict[Tuple[str, str, str], torch.Tensor]) -> Dict[str, torch.Tensor]:
"""Forward pass of the heterogeneous GNN.
Args:
x_dict (Dict[str, torch.Tensor]): Dictionary of node features for each node type
edge_index_dict (Dict[Tuple[str, str, str], torch.Tensor]): Dictionary of edge indices
for each edge type, where the tuple key is (source_type, edge_type, target_type)
Returns:
Dict[str, torch.Tensor]: Dictionary of predictions for each labeled node type
"""
# Initial feature normalization
out_dict = {
node_type: self.norm_layers[node_type](x)
for node_type, x in x_dict.items()
}
# Process through conv layers
for i, conv in enumerate(self.convs):
# Store for residual connection
identity = out_dict
# Apply convolution
conv_out = conv(out_dict, edge_index_dict)
# Apply batch norm, activation, and dropout for each node type
conv_out = {
node_type: self.dropout(
F.elu(self.bns[i][node_type](features))
)
for node_type, features in conv_out.items()
}
# Add residual connection after first layer
if i > 0:
conv_out = {
node_type: features + 0.1 * identity[node_type]
for node_type, features in conv_out.items()
}
out_dict = conv_out
# Multi-head pooling and classification for each labeled node type
final_out = {}
for node_type in self.label_nodes:
# Apply multi-head pooling
mean_pooled = self.pooling[node_type]['mean'](out_dict[node_type])
add_pooled = self.pooling[node_type]['add'](out_dict[node_type])
# Concatenate pooled features
pooled = torch.cat([mean_pooled, add_pooled], dim=1)
# Apply classification head
final_out[node_type] = self.classifiers[node_type](pooled)
# Apply log softmax to outputs
return {
node_type: F.log_softmax(out, dim=1)
for node_type, out in final_out.items()
}
def train_model(self, epochs: int = 300, lr: float = 0.001,
show_progress: bool = False) -> Dict[str, Dict[str, float]]:
"""Train the heterogeneous GNN model.
Args:
epochs (int, optional): Number of training epochs. Defaults to 300.
lr (float, optional): Learning rate. Defaults to 0.001.
show_progress (bool, optional): Whether to display training progress. Defaults to False.
Returns:
Dict[str, Dict[str, float]]: Dictionary of best metrics for each node type
"""
"""
Train with enhanced learning schedule and early stopping
"""
self.data = self.data.to(self.device)
optimizer = Adam(self.parameters(), lr=lr, weight_decay=1e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.7,
patience=20, min_lr=1e-5)
# Initialize best metrics for each node type
best_metrics = {
node_type: {'accuracy': 0, 'precision': 0, 'recall': 0, 'f1': 0, 'epoch': 0}
for node_type in self.label_nodes
}
# Calculate class weights for each node type
class_weights = {}
for node_type in self.label_nodes:
labels = self.data[node_type].y.cpu().numpy()
counts = np.bincount(labels)
class_weights[node_type] = torch.FloatTensor(1.0 / counts).to(self.device)
patience = 150
no_improve = 0
best_avg_f1 = 0
for epoch in range(epochs):
# Training phase
self.train()
optimizer.zero_grad()
# Forward pass
out_dict = self(self.data.x_dict, self.data.edge_index_dict)
total_loss = 0
current_metrics = {node_type: {} for node_type in self.label_nodes}
# Calculate loss and metrics for each node type
for node_type in self.label_nodes:
loss = F.cross_entropy(
out_dict[node_type],
self.data[node_type].y,
weight=class_weights[node_type]
)
total_loss += loss
# Calculate metrics
pred = torch.argmax(out_dict[node_type], dim=1)
y_true = self.data[node_type].y.cpu().numpy()
y_pred = pred.cpu().numpy()
current_metrics[node_type] = {
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, average='weighted',
zero_division=1),
'recall': recall_score(y_true, y_pred, average='weighted',
zero_division=1),
'f1': f1_score(y_true, y_pred, average='weighted')
}
# Backward pass and optimization
total_loss.backward()
torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0)
optimizer.step()
# Calculate average F1 score across all node types
avg_f1 = np.mean([metrics['f1'] for metrics in current_metrics.values()])
scheduler.step(avg_f1)
# Update best metrics and model state
if avg_f1 > best_avg_f1:
best_avg_f1 = avg_f1
best_metrics = {
node_type: {**metrics, 'epoch': epoch}
for node_type, metrics in current_metrics.items()
}
self.best_model_state = self.state_dict()
no_improve = 0
else:
no_improve += 1
if show_progress and (epoch + 1) % 10 == 0:
print(f"\nEpoch {epoch+1:03d}, Loss: {total_loss:.4f}")
for node_type in self.label_nodes:
print(f"\n{node_type.capitalize()} Metrics:")
for metric, value in current_metrics[node_type].items():
print(f"{metric.capitalize()}: {value:.4f}")
if no_improve >= patience:
print(f"\nEarly stopping triggered after {epoch + 1} epochs")
break
if self.best_model_state:
self.load_state_dict(self.best_model_state)
return best_metrics
def predict(self, node_type: str, idx: Optional[int] = None) -> torch.Tensor:
"""Make predictions for a specific node type.
Args:
node_type (str): Type of node to make predictions for
idx (Optional[int], optional): Specific node index. If None, predicts for all nodes.
Defaults to None.
Returns:
torch.Tensor: Predicted class indices
Raises:
ValueError: If node_type has no labels
"""
"""Prediction with optional indexing"""
if node_type not in self.label_nodes:
raise ValueError(f"Node type {node_type} has no labels")
self.eval()
with torch.no_grad():
predictions = self(self.data.x_dict, self.data.edge_index_dict)
pred = torch.argmax(predictions[node_type], dim=1)
return pred[idx] if idx is not None else pred
def predict_all(self) -> Dict[str, torch.Tensor]:
"""Make predictions for all labeled node types.
Returns:
Dict[str, torch.Tensor]: Dictionary of predictions for each labeled node type
"""
"""Predict for all labeled node types"""
self.eval()
with torch.no_grad():
predictions = self(self.data.x_dict, self.data.edge_index_dict)
return {
node_type: torch.argmax(predictions[node_type], dim=1)
for node_type in self.label_nodes
}
def main():
"""Example usage with DBLP dataset"""
from torch_geometric.datasets import DBLP
# Load DBLP dataset
dataset = DBLP(root='rawData/DBLP')
data = dataset[0]
print("\nDataset Information:")
print(data)
# Test configurations
num_layers_to_test = [2, 4, 6]
all_results = []
for num_layers in num_layers_to_test:
print(f"\nTraining with {num_layers} hidden layers:")
model = GNN(data=data, num_hidden_layers=num_layers)
metrics = model.train_model(epochs=300, show_progress=True)
# Calculate average metrics across node types
avg_metrics = {
metric: np.mean([
node_metrics[metric]
for node_metrics in metrics.values()
])
for metric in ['accuracy', 'precision', 'recall', 'f1']
}
layer_results = [
num_layers,
avg_metrics['accuracy'],
avg_metrics['precision'],
avg_metrics['recall'],
avg_metrics['f1']
]
all_results.append(layer_results)
# Print confusion matrices
predictions = model.predict_all()
for node_type in model.label_nodes:
print(f"\nConfusion Matrix for {node_type}:")
print(confusion_matrix(data[node_type].y.cpu(), predictions[node_type].cpu()))
# Print comparison table
headers = ['Num Layers', 'Accuracy', 'Precision', 'Recall', 'F1']
print("\nResults Comparison:")
print(tabulate(all_results, headers=headers, floatfmt='.4f', tablefmt='grid'))
# Find best configuration
best_idx = max(range(len(all_results)), key=lambda i: all_results[i][4])
best_config = all_results[best_idx]
print("\nBest Configuration:")
print(f"Num Layers: {best_config[0]}")
print(f"Accuracy: {best_config[1]:.4f}")
print(f"Precision: {best_config[2]:.4f}")
print(f"Recall: {best_config[3]:.4f}")
print(f"F1 Score: {best_config[4]:.4f}")
if __name__ == "__main__":
main()
\ No newline at end of file
import torch
import torch.nn.functional as F
from torch_geometric.nn import GraphConv, global_mean_pool, global_add_pool
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.nn import Module, BatchNorm1d, Linear, Dropout
from torch_geometric.data import Dataset
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score, precision_score, recall_score
import numpy as np
from typing import Dict, Optional, List, Tuple
from torch_geometric.loader import DataLoader
from tabulate import tabulate
class GNN(Module):
def __init__(self, dataset: Dataset, hidden_channels: int = 256,
num_hidden_layers: int = 4) -> None:
super(GNN, self).__init__()
self.num_hidden_layers = num_hidden_layers
# Separate conv layers and normalization
self.convs = torch.nn.ModuleList()
self.bns = torch.nn.ModuleList()
# First layer: input to hidden
self.convs.append(GraphConv(dataset.num_node_features, hidden_channels))
self.bns.append(BatchNorm1d(hidden_channels))
# Hidden layers
for _ in range(num_hidden_layers - 1):
self.convs.append(GraphConv(hidden_channels, hidden_channels))
self.bns.append(BatchNorm1d(hidden_channels))
# Multi-head pooling
self.lin_mean = Linear(hidden_channels, hidden_channels)
self.lin_add = Linear(hidden_channels, hidden_channels)
# Classification layers
self.classifier = torch.nn.Sequential(
Linear(hidden_channels * 2, hidden_channels),
torch.nn.ELU(),
Dropout(p=0.2),
Linear(hidden_channels, hidden_channels // 2),
torch.nn.ELU(),
Dropout(p=0.2),
Linear(hidden_channels // 2, dataset.num_classes)
)
self.dropout = Dropout(p=0.2)
self.dataset = dataset
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.to(self.device)
self.best_model_state = None
def forward(self, x: torch.Tensor, edge_index: torch.Tensor,
batch: torch.Tensor) -> torch.Tensor:
# Pass through convolution layers
for i in range(self.num_hidden_layers):
identity = x
# Apply convolution
x = self.convs[i](x, edge_index)
# Apply batch norm
x = self.bns[i](x)
# Apply activation and dropout
x = F.elu(x)
x = self.dropout(x)
# Add residual connection for hidden layers
if i > 0:
x = x + 0.1 * identity
# Multi-head pooling
x_mean = global_mean_pool(x, batch)
x_add = global_add_pool(x, batch)
x_mean = self.lin_mean(x_mean)
x_add = self.lin_add(x_add)
# Concatenate and classify
x = torch.cat([x_mean, x_add], dim=1)
x = self.classifier(x)
return F.log_softmax(x, dim=1)
def train_model(self, epochs: int = 300, lr: float = 0.001,
train_split: float = 0.8, show_progress: bool = False) -> Dict[str, float]:
"""
Train with enhanced learning schedule and early stopping
"""
train_loader, test_loader = self._prepare_data(train_split)
optimizer = Adam(self.parameters(), lr=lr, weight_decay=1e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.7,
patience=20, min_lr=1e-5)
best_metrics = {'accuracy': 0, 'precision': 0, 'recall': 0, 'f1': 0, 'epoch': 0}
patience = 150
no_improve = 0
# Enhanced class weight calculation
if train_loader is not None:
all_labels = []
for data in train_loader:
all_labels.extend(data.y.cpu().numpy())
label_counts = np.bincount(all_labels)
class_weights = torch.FloatTensor(1.0 / label_counts).to(self.device)
for epoch in range(epochs):
self.train()
total_loss = 0
train_predictions, train_labels = [], []
for data in train_loader:
data = data.to(self.device)
optimizer.zero_grad()
output = self(data.x, data.edge_index, data.batch)
# Enhanced loss with class weights
loss = F.nll_loss(output, data.y, weight=class_weights)
loss.backward()
# Enhanced gradient clipping
torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0)
optimizer.step()
total_loss += float(loss) * data.num_graphs
pred = output.max(1)[1]
train_predictions.extend(pred.cpu().numpy())
train_labels.extend(data.y.cpu().numpy())
# Calculate training metrics
train_metrics = {
'accuracy': accuracy_score(train_labels, train_predictions),
'f1': f1_score(train_labels, train_predictions, average='weighted'),
'precision': precision_score(train_labels, train_predictions,
average='weighted', zero_division=1),
'recall': recall_score(train_labels, train_predictions,
average='weighted', zero_division=1)
}
# Enhanced evaluation phase
if test_loader is not None:
self.eval()
test_predictions, test_labels = [], []
with torch.no_grad():
for data in test_loader:
data = data.to(self.device)
pred = self(data.x, data.edge_index, data.batch).max(1)[1]
test_predictions.extend(pred.cpu().numpy())
test_labels.extend(data.y.cpu().numpy())
current_metrics = {
'accuracy': accuracy_score(test_labels, test_predictions),
'precision': precision_score(test_labels, test_predictions,
average='weighted', zero_division=1),
'recall': recall_score(test_labels, test_predictions,
average='weighted', zero_division=1),
'f1': f1_score(test_labels, test_predictions, average='weighted')
}
else:
current_metrics = train_metrics
scheduler.step(current_metrics['f1'])
# Update best metrics with improved tracking
if current_metrics['f1'] > best_metrics['f1']:
best_metrics = {**current_metrics, 'epoch': epoch}
self.best_model_state = self.state_dict()
no_improve = 0
else:
no_improve += 1
if show_progress and (epoch + 1) % 10 == 0:
print(f"\nEpoch {epoch+1:03d}, Loss: {total_loss/len(train_loader):.4f}")
print("Training Metrics:")
for metric, value in train_metrics.items():
print(f"Train {metric.capitalize()}: {value:.4f}")
if test_loader is not None:
print("Test Metrics:")
for metric, value in current_metrics.items():
print(f"Test {metric.capitalize()}: {value:.4f}")
# Enhanced early stopping
if no_improve >= patience:
print(f"\nEarly stopping triggered after {epoch + 1} epochs")
break
if self.best_model_state:
self.load_state_dict(self.best_model_state)
return best_metrics
def predict(self, idx: int) -> torch.Tensor:
"""Enhanced prediction for a single graph"""
data = self.dataset[idx].to(self.device)
batch = torch.zeros(data.x.size(0), dtype=torch.long, device=self.device)
self.eval()
with torch.no_grad():
logits = self(data.x, data.edge_index, batch)
return torch.argmax(logits, dim=1)
def predict_all(self, indices: Optional[List[int]] = None) -> torch.Tensor:
"""Enhanced batch prediction"""
if indices is None:
indices = range(len(self.dataset))
self.eval()
predictions = []
for idx in indices:
pred = self.predict(idx)
predictions.append(pred)
return torch.cat(predictions)
def _prepare_data(self, train_split: float = 0.8) -> Tuple[DataLoader, Optional[DataLoader]]:
"""Enhanced data preparation with improved splitting"""
if train_split == 1.0:
train_loader = DataLoader(self.dataset, batch_size=32, shuffle=True)
return train_loader, None
else:
train_size = int(train_split * len(self.dataset))
test_size = len(self.dataset) - train_size
# Enhanced dataset splitting with fixed seed
train_dataset, test_dataset = torch.utils.data.random_split(
self.dataset, [train_size, test_size],
generator=torch.Generator().manual_seed(42)
)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
return train_loader, test_loader
def main():
"""Enhanced testing with improved configurations"""
from torch_geometric.datasets import BAMultiShapesDataset
# Load dataset
dataset = BAMultiShapesDataset(root='rawData/BAMultiShapes')
original_labels = [data.y.item() for data in dataset]
# Enhanced test configurations
num_layers_to_test = [2, 4, 6]
all_results = []
# Test all configurations with improved logging
for num_layers in num_layers_to_test:
print(f"\nTraining with {num_layers} hidden layers:")
model = GNN(dataset=dataset, num_hidden_layers=num_layers)
metrics = model.train_model(epochs=300, show_progress=True)
predicted_labels = model.predict_all()
print("Confusion Matrix:")
print(confusion_matrix(original_labels, predicted_labels))
layer_results = [
num_layers,
float(metrics['accuracy']),
float(metrics['precision']),
float(metrics['recall']),
float(metrics['f1'])
]
all_results.append(layer_results)
# Enhanced results presentation
headers = ['Num Layers', 'Accuracy', 'Precision', 'Recall', 'F1']
print("\nResults Comparison:")
print(tabulate(all_results, headers=headers,
floatfmt='.4f', tablefmt='grid'))
# Find best configuration
best_idx = max(range(len(all_results)),
key=lambda i: all_results[i][4]) # F1 score index
best_config = all_results[best_idx]
print("\nBest Configuration:")
print(f"Num Layers: {best_config[0]}")
print(f"Accuracy: {best_config[1]:.4f}")
print(f"Precision: {best_config[2]:.4f}")
print(f"Recall: {best_config[3]:.4f}")
print(f"F1 Score: {best_config[4]:.4f}")
if __name__ == "__main__":
main()
\ No newline at end of file
from typing import Dict, List, Set, Tuple, Optional
import networkx as nx
from networkx.algorithms import isomorphism
from collections import defaultdict
from ConceptLearner.Visualiser import Visualiser
class PatternFinder:
"""
......@@ -21,7 +18,7 @@ class PatternFinder:
max_size: int = 12,
min_frequency: int = 2,
max_frequency: Optional[int] = None,
min_density: float = 0.21,
min_density: float = 0.3,
min_degree: float = 2,
community_detection: bool = True):
"""
......
......@@ -313,8 +313,7 @@ class Visualiser:
f"Pattern Size: {pattern_graph.number_of_nodes()}\n"
f"Pattern Edges: {pattern_graph.number_of_edges()}\n"
f"Instance #{idx} of {len(instances)}\n"
f"Matched Nodes: {sorted(instance_nodes)}\n\n"
f"Edge Types in Instance:\n{edge_type_info}"
f"Matched Nodes: {sorted(instance_nodes)}"
)
plt.text(0.02, 0.98, props,
......
......@@ -54,7 +54,6 @@ class DBLP(TextDataset):
self.dataset['author'].num_nodes = len(author_labels)
self.dataset['author'].y = torch.tensor(author_labels)
self.dataset['author'].yLabel = ["Database", "Data Mining", "Artificial Intelligence", "Information Retrieval"]
self.dataset['author'].x = torch.zeros((self.dataset['author'].num_nodes, 1), dtype=torch.float32) # Dummy feature matrix
# Paper Nodes
self.dataset['paper'].x = paper_tensor.float()
......@@ -64,7 +63,7 @@ class DBLP(TextDataset):
self.dataset['author', 'writes', 'paper'].edge_index = author_paper_mappings.t()
self.dataset['paper', 'written_by', 'author'].edge_index = author_paper_mappings.t()[[1, 0], :]
self.dataset['conference'].x = torch.tensor(len(conf_ids))
self.dataset['conference'].num_nodes = torch.tensor(len(conf_ids))
self.dataset['paper', 'published_in', 'conference'].edge_index = paper_conference_mappings.t()
return self.dataset
......
......@@ -44,7 +44,7 @@ class MUTAG(StructuredDataset):
min_size=3,
max_size=20,
min_frequency=2,
min_density=0.4,
min_density=0.2,
min_degree=1
)
......
......@@ -68,7 +68,13 @@ if __name__ == "__main__":
print("Confusion Matrix:")
print(metrics['confusion_matrix'])
ms.visualize_graphs([4, 8, 12, 456, 567])
graphs_to_visualise = [4, 8, 12, 456, 567]
ms.visualize_graphs(graphs_to_visualise)
for graph_idx in graphs_to_visualise:
for pattern_idx, pattern_present in enumerate(presence_matrix[graph_idx]):
if pattern_present == 1:
ms.visualize_pattern_in_graph(pattern_idx, graph_idx)
incorrect_indices = [
i for i, (true, pred) in enumerate(zip(original_labels, predicted_labels))
......@@ -78,11 +84,6 @@ if __name__ == "__main__":
print(f"Incorrectly predicted graph indices: {incorrect_indices}")
if incorrect_indices:
print("\nSuperclasses of incorrectly predicted graphs:")
for idx in incorrect_indices:
super_classes = dataset[idx].super_classes
print(f"Graph {idx}: Superclasses: {super_classes}")
print("\nVisualizing incorrectly predicted graphs...")
incorrect_visualization_dir = ms.visualize_graphs(incorrect_indices)
if incorrect_visualization_dir:
......
......@@ -7,7 +7,7 @@ from customDBs.MultiShape import MultiShape
from customDBs.MUTAG import MUTAG
from customDBs.BA2Motif import BA2Motif
from customDBs.StructuredDataset import StructuredDataset
from ConceptLearner.GNN4 import GNN
from ConceptLearner.HomogeneousGNN import GNN
from ontolearn.owlapy.render import DLSyntaxObjectRenderer
from ontolearn.metrics import Accuracy, Precision, Recall, F1
from pathlib import Path
......@@ -21,7 +21,7 @@ def explain_gnn(model, dataset, datasetName, explanations_dict, high_level_conce
"http://example.org/",
owl_graph_path=f"./owlGraphs/{datasetName}_experiment{ '_with_motif' if high_level_concepts is not None else '_without_motif'}.owl",
generate_new_owl_file=True,
ignore_nodes=True,
ignore_nodes=False,
high_level_concepts=high_level_concepts
)
......@@ -69,7 +69,8 @@ def experiment(structuredDataset: StructuredDataset, datasetName: str):
# Initialize JSON structure
evaluations = {
"gnn": {},
"explanation": {}
"explanation": {},
"confusion_matrix": {}
}
# Initialize GNN model
......@@ -82,8 +83,10 @@ def experiment(structuredDataset: StructuredDataset, datasetName: str):
original_labels = [data.y.item() for data in structuredDataset.dataset]
predicted_labels = model.predict_all()
cm = confusion_matrix(original_labels, predicted_labels)
print("Confusion Matrix:")
print(confusion_matrix(original_labels, predicted_labels))
print(cm)
evaluations["confusion_matrix"] = cm.tolist()
# Save GNN training metrics
evaluations["gnn"] = metrics
......@@ -109,7 +112,7 @@ def experiment(structuredDataset: StructuredDataset, datasetName: str):
print("\nDetected motifs...")
print(patterns)
patterns_path = structuredDataset.visualize_patterns()
timeStamp = patterns_path.split(1)
timeStamp = patterns_path.split("/")[1]
evaluations["path"] = patterns_path
# Explain GNN after finding motifs
......
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment