Skip to content
Snippets Groups Projects
Commit 8cbf91f4 authored by AjUm-HEIDI's avatar AjUm-HEIDI
Browse files

Restrucre code

parent 6bedb4fb
No related branches found
No related tags found
No related merge requests found
Showing
with 245 additions and 490 deletions
......@@ -5,3 +5,7 @@ owlGraphs
experimentalResults
*.pyc
torchGeometricDatasets
tests/generatedGraphs
visualizations
evaluation_results
.env
\ No newline at end of file
"""Methods and classes that inherit the parent explainer class"""
import torch
from ConceptLearner.GNN import GNN
from ConceptLearner.ConvertToOWL import ConvertToOWL
from ConceptLearner.Utils import find_classes_with_y_labels
from torch_geometric.data import HeteroData
from Utils.ConvertToOWL import convert_to_owl
from Utils.Utils import find_classes_with_y_labels
from torch_geometric.data import HeteroData, Data
from ontolearn.owlapy.model import OWLClassExpression
from ontolearn.knowledge_base import KnowledgeBase
from ontolearn.concept_learner import EvoLearner
......@@ -11,14 +10,36 @@ from ontolearn.learning_problem import PosNegLPStandard
from ontolearn.owlapy.model import OWLNamedIndividual, IRI
from ontolearn.metrics import Accuracy, F1
from ontolearn.abstracts import AbstractScorer
from typing import Optional
from typing import Optional, Union
import os
from ontolearn.owlapy.model import OWLNamedIndividual, IRI, OWLObjectIntersectionOf, \
OWLClassExpression, OWLObjectUnionOf, OWLObjectComplementOf, OWLObjectOneOf, \
OWLObjectMaxCardinality, OWLObjectMinCardinality, OWLObjectAllValuesFrom, \
OWLObjectSomeValuesFrom, OWLClass, OWLNothing
import random
import re
from typing import Final
from ontolearn.abstracts import AbstractFitness
from ontolearn.ea_utils import Tree
class LinearPressureFitness(AbstractFitness):
"""Linear parametric parsimony pressure."""
__slots__ = 'gain', 'penalty'
name: Final = 'Linear_Pressure_Fitness'
def __init__(self, gain: float = 2048.0, penalty: float = 1.0):
self.gain = gain
self.penalty = penalty
def apply(self, individual: Tree):
quality = individual.quality.values[0]
fitness = self.gain*quality - self.penalty*len(individual)
print(individual)
print(self.gain, quality, self.gain*quality, len(individual), fitness)
individual.fitness.values = (round(fitness, 5),)
class DiscriminativeExplainer:
""" An abstract class which represent an explainer. An explainer should be able to use a label to generate a
......@@ -66,7 +87,7 @@ class DiscriminativeExplainer:
return OWLObjectMinCardinality(ce.get_cardinality(), ce.get_property(), ce2)
return ce
def __init__(self, gnn, data: HeteroData, namespace = "http://example.org/", owl_graph_path = "./owlGraphs/example.owl", generate_new_owl_file: bool = False, create_nominals: bool = False, add_edge_counts: bool = False, create_data_properties_as_object: bool = False, full_edge_name: bool = False) -> None:
def __init__(self, gnn, data: Union[HeteroData, Data], namespace = "http://example.org/", owl_graph_path = "./owlGraphs/example.owl", generate_new_owl_file: bool = False, create_nominals: bool = False, add_edge_counts: bool = False, create_data_properties_as_object: bool = False, full_edge_name: bool = False, ignore_nodes: bool = False, high_level_concepts: dict = None) -> None:
"""Initializes the explainer based on the given GNN and the Dataset. After the initialization the object should
be able to produce explanations of single labels.
......@@ -86,12 +107,14 @@ class DiscriminativeExplainer:
self.namespace = namespace
self.owl_graph_path = owl_graph_path
self.create_nominals = create_nominals
self.ignore_nodes = ignore_nodes
self.is_multi_graph = not isinstance(self.data, HeteroData)
self.classNames = find_classes_with_y_labels(self.data, first_only=False) if not self.is_multi_graph else []
self.high_level_concepts = high_level_concepts
if generate_new_owl_file and os.path.isfile(self.owl_graph_path):
os.remove(self.owl_graph_path)
if not os.path.isfile(self.owl_graph_path):
self.owlGraph = ConvertToOWL(data=self.data, namespace=self.namespace, owlGraphPath=self.owl_graph_path, create_nominals=create_nominals, add_edge_counts=add_edge_counts, create_data_properties_as_object = create_data_properties_as_object, full_edge_name=full_edge_name)
self.owlGraph = convert_to_owl(data=self.data, namespace=self.namespace, owlGraphPath=self.owl_graph_path, high_level_concepts=self.high_level_concepts, create_nominals=create_nominals, add_edge_counts=add_edge_counts, create_data_properties_as_object = create_data_properties_as_object, full_edge_name=full_edge_name, ignore_nodes=self.ignore_nodes)
self.owlGraph.buildGraph()
self.knowledge_base = KnowledgeBase(path=self.owl_graph_path)
......@@ -102,7 +125,8 @@ class DiscriminativeExplainer:
debug: Optional[bool] = False,
max_runtime: Optional[int] = 60,
num_generations: Optional[int] = 600,
quality_func: Optional[AbstractScorer] = None) -> OWLClassExpression:
quality_func: Optional[AbstractScorer] = None,
length_penalty: Optional[int] = 5,) -> OWLClassExpression:
"""Explains based on the GNN a given label. The explanation is in the form of a Class Expression.
Args:
......@@ -119,15 +143,14 @@ class DiscriminativeExplainer:
if quality_func is None:
quality_func = F1()
self.model = EvoLearner(knowledge_base=self.knowledge_base, use_data_properties=use_data_properties, max_runtime=max_runtime, num_generations=num_generations, quality_func=quality_func)
self.model = EvoLearner(knowledge_base=self.knowledge_base, use_data_properties=use_data_properties, max_runtime=max_runtime, num_generations=num_generations, quality_func=quality_func, population_size=1000, fitness_func=LinearPressureFitness(penalty=length_penalty))
positive_examples = []
negative_examples = []
if self.is_multi_graph:
# Handle multi-graph dataset while keeping original prediction logic
predictions = {} if not debug else None
if not debug and self.gnn:
predictions = {idx: self.gnn.predict(graph) for idx, graph in enumerate(self.data)}
predictions = {idx: self.gnn.predict(idx) for idx in range(len(self.data))}
for idx in range(len(self.data)):
graph = self.data[idx]
......@@ -136,7 +159,7 @@ class DiscriminativeExplainer:
if debug:
is_positive = graph.y.item() == label
else:
is_positive = predictions[idx].item() == label if predictions else graph.y.item() == label
is_positive = predictions[idx].item() == label
if is_positive:
positive_examples.append(graph_uri)
......@@ -146,7 +169,7 @@ class DiscriminativeExplainer:
# Original HeteroData handling (unchanged)
predictions = {} if not debug else None
if not debug and self.gnn:
predictions = self.gnn.predict_all(new_data=self.data)
predictions = self.gnn.predict_all()
for node_type in self.data.node_types:
if debug:
if node_type in self.classNames:
......@@ -157,16 +180,16 @@ class DiscriminativeExplainer:
positive_examples.append(node)
else:
negative_examples.append(node)
else:
if "x" in self.data[node_type]:
noOfNodes = self.data[node_type].x.size()[0]
for idx in range(noOfNodes):
node = f"{self.namespace}{node_type}#{idx+1}"
negative_examples.append(node)
elif "num_nodes" in self.data[node_type]:
for idx in range(self.data[node_type].num_nodes):
node = f"{self.namespace}{node_type}#{idx+1}"
negative_examples.append(node)
# else:
# if "x" in self.data[node_type]:
# noOfNodes = self.data[node_type].x.size()[0]
# for idx in range(noOfNodes):
# node = f"{self.namespace}{node_type}#{idx+1}"
# negative_examples.append(node)
# elif "num_nodes" in self.data[node_type]:
# for idx in range(self.data[node_type].num_nodes):
# node = f"{self.namespace}{node_type}#{idx+1}"
# negative_examples.append(node)
else:
if node_type in predictions:
nodeTypePredictions = predictions[node_type]
......@@ -190,6 +213,7 @@ class DiscriminativeExplainer:
typed_neg = set(map(OWLNamedIndividual, map(IRI.create, set(negative_examples))))
lp = PosNegLPStandard(pos=typed_pos, neg=typed_neg)
print(len(positive_examples), len(negative_examples))
accepted_hypotheses = []
# cross check if the quality stated is within the threshold of the actual quality
while True:
......
import torch
from torch.nn import Module, Linear, ReLU, Dropout
import torch.nn.functional as F
from torch.optim import Adam
from torch_geometric.nn import HeteroConv, SAGEConv
from torch_geometric.data import HeteroData
from ConceptLearner.Utils import find_classes_with_y_labels, get_feature_sizes_and_edge_config
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
import numpy as np
from typing import Dict, Tuple, Optional
class GNN(Module):
def __init__(self, data: HeteroData, out_features: int = 128) -> None:
"""
Initialize GNN for heterogeneous graph data
Args:
data: HeteroData graph with node features and edge indices
out_features: Dimension of output embeddings
"""
super(GNN, self).__init__()
self.data = data
feature_sizes, edge_config = get_feature_sizes_and_edge_config(data)
self.convs = HeteroConv({
(src, rel, dst): SAGEConv((feature_sizes[src], feature_sizes[dst]), out_features)
for src, rel, dst in edge_config
}, aggr='mean')
self.transforms = torch.nn.ModuleDict({
node: Linear(out_features, out_features)
for node in feature_sizes
})
self.outputs = torch.nn.ModuleDict()
self.label_classes = find_classes_with_y_labels(data, first_only=False)
for node_type in self.label_classes:
num_classes = len(torch.unique(data[node_type].y))
self.outputs[node_type] = Linear(out_features, num_classes)
self.dropout = Dropout(p=0.3)
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.to(self.device)
self.best_model_state = None
def forward(self, x_dict: Dict[str, torch.Tensor],
edge_index_dict: Dict[Tuple[str, str, str], torch.Tensor]) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]:
"""
Forward pass of GNN model
Args:
x_dict: Dictionary mapping node types to feature matrices
edge_index_dict: Dictionary mapping edge types to edge indices
Returns:
tuple: (logits, embeddings)
- logits: Dict mapping node types to prediction logits
- embeddings: Dict mapping node types to node embeddings
"""
x_dict = self.convs(x_dict, edge_index_dict)
for node_type in x_dict:
x_dict[node_type] = self.dropout(F.relu(self.transforms[node_type](x_dict[node_type])))
logits = {node_type: self.outputs[node_type](x_dict[node_type])
for node_type in self.outputs if node_type in x_dict}
return logits, x_dict
def train_model(self, epochs: int = 100, lr: float = 0.01,
show_progress: bool = False) -> Dict[str, Dict[str, float]]:
"""
Train GNN model on graph data
Args:
epochs: Number of training epochs
lr: Learning rate for optimizer
show_progress: Whether to print training progress
Returns:
dict: Best metrics per node type containing:
- accuracy, precision, recall, f1 scores
- epoch number of best model
"""
self.data = self.data.to(self.device)
optimizer = Adam(self.parameters(), lr=lr)
best_metrics = {node_type: {'accuracy': 0, 'precision': 0, 'recall': 0, 'f1': 0, 'epoch': 0}
for node_type in self.label_classes}
class_weights = {}
for node_type in self.label_classes:
labels = self.data[node_type].y.cpu().numpy()
counts = np.bincount(labels)
class_weights[node_type] = torch.FloatTensor(1.0 / counts).to(self.device)
self.train()
for epoch in range(epochs):
optimizer.zero_grad()
logits, _ = self(self.data.x_dict, self.data.edge_index_dict)
total_loss = 0
current_metrics = {node_type: {} for node_type in self.label_classes}
for node_type in self.label_classes:
node_logits = logits[node_type]
loss = F.cross_entropy(node_logits, self.data[node_type].y,
weight=class_weights[node_type])
total_loss += loss
_, predictions = torch.max(node_logits, dim=1)
y_true = self.data[node_type].y.cpu().numpy()
y_pred = predictions.cpu().numpy()
current_metrics[node_type] = {
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, average='weighted',
zero_division=1),
'recall': recall_score(y_true, y_pred, average='weighted',
zero_division=1),
'f1': f1_score(y_true, y_pred, average='weighted')
}
if current_metrics[node_type]['f1'] > best_metrics[node_type]['f1']:
best_metrics[node_type] = {**current_metrics[node_type], 'epoch': epoch}
self.best_model_state = self.state_dict()
total_loss.backward()
optimizer.step()
if show_progress and epoch % 10 == 0:
print(f"\nEpoch {epoch}:")
print(f"Total Loss = {total_loss.item():.4f}")
for node_type in self.label_classes:
print(f"\n{node_type.capitalize()} Metrics:")
for metric, value in current_metrics[node_type].items():
print(f"{metric.capitalize()} = {value:.4f}")
if self.best_model_state is not None:
self.load_state_dict(self.best_model_state)
return best_metrics
def predict(self, node_type: str, idx: Optional[int] = None) -> torch.Tensor:
"""
Predict class for node(s) of given type
Args:
node_type: Type of node (e.g. 'author', 'paper')
idx: Optional index for single node prediction
Returns:
torch.Tensor: Single class prediction if idx provided,
otherwise predictions for all nodes of type
Raises:
ValueError: If node_type has no labels or idx out of bounds
"""
if node_type not in self.label_classes:
raise ValueError(f"Node type {node_type} has no labels")
self.eval()
with torch.no_grad():
logits, _ = self(self.data.x_dict, self.data.edge_index_dict)
predictions = torch.argmax(logits[node_type], dim=1)
return predictions[idx] if idx is not None else predictions
def predict_all(self) -> Dict[str, torch.Tensor]:
"""
Predict classes for all labeled node types
Returns:
dict: Mapping from node types to tensors of class predictions
for all nodes of that type
"""
self.eval()
with torch.no_grad():
logits, _ = self(self.data.x_dict, self.data.edge_index_dict)
return {node_type: torch.argmax(logits[node_type], dim=1)
for node_type in self.label_classes}
# If you want to run this script directly, you can do so by:
if __name__ == "__main__":
from torch_geometric.datasets import IMDB
from torch_geometric.datasets import DBLP
epochs=50
lr=0.01
show_progress=True
# 1) Load the IMDB heterogeneous dataset
# This will have node types: 'movie', 'director', 'actor'
# The 'movie' node type has labels (y) for classification.
dataset = DBLP(root='rawData/dblp')
data = dataset[0] # The dataset has a single large HeteroData object
print(data)
# 2) Initialize the GNN model for heterogeneous data
model = GNN(data=data)
# 3) Train the model; this uses the train_model() method from your GNN class
best_metrics = model.train_model(epochs=epochs, lr=lr, show_progress=show_progress)
# 4) Print the best metrics (accuracy, precision, recall, f1) for each node type
print("\nBest metrics across node types:")
for node_type, metrics in best_metrics.items():
print(f"\nNode type: {node_type}")
for metric_name, metric_value in metrics.items():
if metric_name != 'epoch':
print(f" {metric_name.capitalize()}: {metric_value:.4f}")
print(f" Best Epoch: {metrics['epoch']}")
# 5) Example predictions
# If you want to predict labels on all "movie" nodes (or another labeled node type)
if 'movie' in best_metrics: # 'movie' is the usual labeled node in IMDB
movie_predictions = model.predict(node_type='movie')
print(f"\nPredictions for 'movie' node type: {movie_predictions.shape}")
print(movie_predictions)
model.eval() # Ensure model is in eval mode
for node_type in model.label_classes: # model.label_classes has all node types with labels
# Get ground truth and predictions
y_true = data[node_type].y.cpu().numpy()
y_pred = model.predict(node_type=node_type).cpu().numpy()
# Identify incorrectly predicted indices
incorrect_indices = np.where(y_true != y_pred)[0]
# Print summary
print(f"\nNode type '{node_type}' - Total incorrect predictions: {len(incorrect_indices)} of {len(y_pred)}")
# Print details for each mismatch
for idx in incorrect_indices:
print(f" Node index: {idx}, Predicted: {y_pred[idx]}, Actual: {y_true[idx]}")
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_add_pool
from torch.optim import Adam
from torch.nn import Module, BatchNorm1d, Linear
from torch_geometric.data import Data
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
import numpy as np
from typing import Dict, Tuple, Optional, List
from torch_geometric.loader import DataLoader
from torch_geometric.datasets import BAMultiShapesDataset
from torch_geometric.data import Data, Dataset
class GNN(Module):
def __init__(self, dataset: Dataset, hidden_channels: int = 64) -> None:
super(GNN, self).__init__()
self.dataset = dataset
self.conv1 = GCNConv(dataset.num_node_features, hidden_channels)
self.conv2 = GCNConv(hidden_channels, hidden_channels)
self.conv3 = GCNConv(hidden_channels, hidden_channels)
self.bn1 = BatchNorm1d(hidden_channels)
self.bn2 = BatchNorm1d(hidden_channels)
self.bn3 = BatchNorm1d(hidden_channels)
self.lin1 = Linear(hidden_channels, hidden_channels)
self.lin2 = Linear(hidden_channels, dataset.num_classes)
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.to(self.device)
self.best_model_state = None
def forward(self, x: torch.Tensor, edge_index: torch.Tensor,
batch: torch.Tensor) -> torch.Tensor:
x = F.relu(self.bn1(self.conv1(x, edge_index)))
x = F.dropout(x, p=0.3, training=self.training)
x = F.relu(self.bn2(self.conv2(x, edge_index)))
x = F.dropout(x, p=0.3, training=self.training)
x = F.relu(self.bn3(self.conv3(x, edge_index)))
x = F.dropout(x, p=0.3, training=self.training)
x = global_add_pool(x, batch)
x = F.relu(self.lin1(x))
x = F.dropout(x, p=0.5, training=self.training)
x = self.lin2(x)
return F.log_softmax(x, dim=1)
def train_model(self, train_loader, test_loader, epochs: int = 200,
lr: float = 0.005, show_progress: bool = False) -> Dict[str, float]:
optimizer = Adam(self.parameters(), lr=lr, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='max', factor=0.5, patience=10, min_lr=1e-5)
best_metrics = {'accuracy': 0, 'precision': 0, 'recall': 0, 'f1': 0, 'epoch': 0}
for epoch in range(epochs):
# Training
self.train()
total_loss = 0
for data in train_loader:
data = data.to(self.device)
optimizer.zero_grad()
output = self(data.x, data.edge_index, data.batch)
loss = F.nll_loss(output, data.y)
loss.backward()
optimizer.step()
total_loss += float(loss) * data.num_graphs
# Evaluation
self.eval()
predictions, labels = [], []
with torch.no_grad():
for data in test_loader:
data = data.to(self.device)
pred = self(data.x, data.edge_index, data.batch).max(1)[1]
predictions.extend(pred.cpu().numpy())
labels.extend(data.y.cpu().numpy())
current_metrics = {
'accuracy': accuracy_score(labels, predictions),
'precision': precision_score(labels, predictions, average='weighted', zero_division=1),
'recall': recall_score(labels, predictions, average='weighted', zero_division=1),
'f1': f1_score(labels, predictions, average='weighted')
}
scheduler.step(current_metrics['f1'])
if current_metrics['f1'] > best_metrics['f1']:
best_metrics = {**current_metrics, 'epoch': epoch}
self.best_model_state = self.state_dict()
if show_progress and (epoch + 1) % 10 == 0:
print(f"\nEpoch {epoch+1:03d}, Loss: {total_loss/len(train_loader):.4f}")
for metric, value in current_metrics.items():
print(f"{metric.capitalize()}: {value:.4f}")
if self.best_model_state:
self.load_state_dict(self.best_model_state)
return best_metrics
def predict(self, idx: int) -> torch.Tensor:
"""Predict class for single graph by index"""
data = self.dataset[idx].to(self.device)
batch = torch.zeros(data.x.size(0), dtype=torch.long, device=self.device)
self.eval()
with torch.no_grad():
logits = self(data.x, data.edge_index, batch)
return torch.argmax(logits, dim=1)
def predict_all(self, indices=None) -> torch.Tensor:
"""Predict for multiple graphs by indices"""
if indices is None:
indices = range(len(self.dataset))
self.eval()
predictions = []
for idx in indices:
pred = self.predict(idx)
predictions.append(pred)
return torch.cat(predictions)
def main():
"""Main training and evaluation pipeline"""
# Load dataset
dataset = BAMultiShapesDataset(root='../rawData/BAMultiShapes')
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
model = GNN(dataset=dataset)
best_metrics = model.train_model(train_loader, test_loader, epochs=200, show_progress=True)
print("\nBest Model Metrics:", {k:f"{v:.4f}" for k,v in best_metrics.items()})
# Example predictions
sample_idx = 0
pred = model.predict(sample_idx)
print(f"\nPrediction for graph {sample_idx}: {pred}")
test_indices = list(range(test_size))
all_preds = model.predict_all(test_indices)
print(f"Test set predictions shape: {all_preds.shape}")
if __name__ == "__main__":
main()
\ No newline at end of file
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from torch_geometric.datasets import BA2MotifDataset
from customDBs.StructuredDataset import StructuredDataset
from CustomDataset.Structured.Base import Base
import networkx as nx
class BA2Motif(StructuredDataset):
class BA2Motif(Base):
"""
A class to process the BA2Motif dataset, find frequent patterns, and visualize graphs and patterns.
"""
......
......@@ -2,11 +2,11 @@ from collections import Counter
import numpy as np
import torch
from torch_geometric.utils import to_networkx
from ConceptLearner.PatternFinder import PatternFinder
from ConceptLearner.Visualiser import Visualiser
from Utils.PatternFinder import PatternFinder
from Utils.Visualiser import Visualiser
class StructuredDataset:
class Base:
"""
Base class for processing graph datasets, finding patterns, and visualizing graphs.
"""
......
from torch_geometric.datasets import TUDataset
from customDBs.StructuredDataset import StructuredDataset
from CustomDataset.Structured.Base import Base
class MUTAG(StructuredDataset):
class MUTAG(Base):
"""
A class to process the MUTAG dataset, find frequent patterns, and visualize graphs and patterns.
"""
......
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from torch_geometric.datasets import BAMultiShapesDataset
from customDBs.StructuredDataset import StructuredDataset
from CustomDataset.Structured.Base import Base
import networkx as nx
class MultiShape(StructuredDataset):
class MultiShape(Base):
"""
A class to process the BAMultiShapes dataset, find frequent patterns, and visualize graphs and patterns.
"""
......@@ -40,8 +40,6 @@ class MultiShape(StructuredDataset):
return [house, grid, wheel]
if __name__ == "__main__":
ms = MultiShape(path='../rawData/BAMultiShapes')
......
File moved
from torch_geometric.data import HeteroData
from collections import defaultdict
import os
import re
import torch
from nltk.corpus import stopwords
import nltk
from ConceptLearner.Utils import group_themes
import csv
from datetime import datetime
# Ensure NLTK stopwords are available
try:
......@@ -16,15 +9,12 @@ except LookupError:
nltk.download('stopwords')
STOP_WORDS = set(stopwords.words('english'))
class TextDataset:
class Base:
def __init__(self, path, bag_of_words_size=100, remove_all_false_values=True):
self.path = path
self.bag_of_words_size = bag_of_words_size
self.remove_all_false_values = remove_all_false_values
self.dataset = HeteroData()
def load_dataset(self, path):
raise NotImplementedError("This method should be implemented by subclasses.")
def fetch_themes(self, num_groups, grouped_keywords_path=''):
raise NotImplementedError("This method should be implemented by subclasses.")
......@@ -2,17 +2,16 @@ from collections import defaultdict
import os
import re
import torch
from torch_geometric.data import HeteroData
from ConceptLearner.Utils import group_themes
from Utils.Utils import group_themes
from nltk.corpus import stopwords
import nltk
from customDBs.TextDataset import TextDataset
from CustomDataset.Text.Base import Base
# Define label categories as constants
class DBLP(TextDataset):
class DBLP(Base):
def __init__(self, path='rawData/dblp', bag_of_words_size=100, remove_all_false_values=True):
def __init__(self, path='rawData/dblp', bag_of_words_size=100, remove_all_false_values=False):
"""
Loads the DBLP dataset and constructs a HeteroData object for PyTorch Geometric.
......
......@@ -2,22 +2,19 @@ from collections import defaultdict
import torch
from torch_geometric.data import HeteroData
import csv
import os
from Utils.Utils import group_themes
from CustomDataset.Text.Base import Base
from ConceptLearner.Utils import group_themes
class IMDB:
def __init__(self, path='rawData/imdb/movie_metadata.csv', bag_of_words_size=100, remove_all_false_values=True):
class IMDB(Base):
def __init__(self, path='rawData/imdb/movie_metadata.csv', bag_of_words_size=100, remove_all_false_values=False):
"""
Initialize the IMDB dataset.
Args:
path (str): Path to the IMDB directory.
"""
self.path = path
self.bag_of_words_size = bag_of_words_size
self.remove_all_false_values = remove_all_false_values
self.dataset = HeteroData()
super().__init__(path=path, bag_of_words_size=bag_of_words_size, remove_all_false_values=remove_all_false_values)
self._initialize()
def _initialize(self):
......@@ -60,13 +57,13 @@ class IMDB:
# Add director and actor nodes and edges
movie_to_director_edges = self._add_directors_and_edges(movie_to_director)
self.dataset['movie', 'to', 'director'].edge_index = movie_to_director_edges
self.dataset['director', 'to', 'movie'].edge_index = movie_to_director_edges.flip(0)
self.dataset['movie', 'directed_by', 'director'].edge_index = movie_to_director_edges
self.dataset['director', 'directs', 'movie'].edge_index = movie_to_director_edges.flip(0)
movie_to_actors_edges = self._add_actors_and_edges(movie_to_actors)
self.dataset['movie', 'to', 'actor'].edge_index = movie_to_actors_edges
self.dataset['actor', 'to', 'movie'].edge_index = movie_to_actors_edges.flip(0)
self.dataset['movie', 'acted_by', 'actor'].edge_index = movie_to_actors_edges
self.dataset['actor', 'acts_in', 'movie'].edge_index = movie_to_actors_edges.flip(0)
return self.dataset
......@@ -179,7 +176,6 @@ class IMDB:
return edge_index
def fetch_themes(self, num_groups, groupedKeywordsPath=''):
"""
Groups tensor features into themes based on vocabulary words.
......@@ -233,5 +229,5 @@ class IMDB:
# Example usage
if __name__ == "__main__":
dataset = IMDB( path='rawData/imdb/sample.csv')
dataset = IMDB()
print(dataset.dataset)
import torch
import torch.nn.functional as F
from typing import Dict, List, Optional, Tuple
class BaseGNN(torch.nn.Module):
def __init__(self, device: Optional[str] = None):
"""
Base class for Graph Neural Networks, supporting both homogeneous
and heterogeneous GNNs.
Args:
device (Optional[str]): Device to use ('cuda' or 'cpu'). If None, auto-detect.
"""
super(BaseGNN, self).__init__()
self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu')
self.best_model_state = None
def forward(self, *args, **kwargs):
"""
Forward pass to be implemented by child classes.
"""
raise NotImplementedError("The `forward` method must be implemented in a subclass.")
def predict(self, *args, **kwargs) -> torch.Tensor:
"""
Predict labels for a single graph or node, depending on the type of GNN.
To be implemented in child classes if customization is required.
Returns:
torch.Tensor: Predicted label(s).
"""
raise NotImplementedError("The method must be implemented in a subclass.")
def predict_all(self, *args, **kwargs) -> torch.Tensor:
"""
Predict labels for all graphs or nodes, depending on the type of GNN.
To be implemented in child classes if customization is required.
Returns:
torch.Tensor: Predicted labels.
"""
raise NotImplementedError("The method must be implemented in a subclass.")
import torch
import torch.nn.functional as F
from typing import Dict, List, Optional, Tuple
class GNN(torch.nn.Module):
def __init__(self, device: Optional[str] = None):
"""
Base class for Graph Neural Networks, supporting both homogeneous
and heterogeneous GNNs.
Args:
device (Optional[str]): Device to use ('cuda' or 'cpu'). If None, auto-detect.
"""
super(GNN, self).__init__()
self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu')
self.best_model_state = None
def forward(self, *args, **kwargs):
"""
Forward pass to be implemented by child classes.
"""
raise NotImplementedError("The `forward` method must be implemented in a subclass.")
def predict(self, *args, **kwargs) -> torch.Tensor:
"""
Predict labels for a single graph or node, depending on the type of GNN.
To be implemented in child classes if customization is required.
Returns:
torch.Tensor: Predicted label(s).
"""
self.eval()
with torch.no_grad():
return self._predict(*args, **kwargs)
def predict_all(self, *args, **kwargs) -> torch.Tensor:
"""
Predict labels for all graphs or nodes, depending on the type of GNN.
To be implemented in child classes if customization is required.
Returns:
torch.Tensor: Predicted labels.
"""
self.eval()
with torch.no_grad():
return self._predict_all(*args, **kwargs)
def _predict(self, *args, **kwargs) -> torch.Tensor:
"""
Internal predict method for subclasses to override.
"""
raise NotImplementedError("The `_predict` method must be implemented in a subclass.")
def _predict_all(self, *args, **kwargs) -> torch.Tensor:
"""
Internal predict_all method for subclasses to override.
"""
raise NotImplementedError("The `_predict_all` method must be implemented in a subclass.")
......@@ -3,16 +3,17 @@ import torch.nn.functional as F
from torch.nn import Module, Linear, BatchNorm1d, Dropout
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch_geometric.nn import HeteroConv, GraphConv
from torch_geometric.nn import HeteroConv, GraphConv, global_mean_pool, global_add_pool
from torch_geometric.data import HeteroData
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score, precision_score, recall_score
import numpy as np
from typing import Dict, Optional, List, Tuple
from torch_geometric.loader import DataLoader
from tabulate import tabulate
from ConceptLearner.Utils import get_feature_sizes_and_edge_config, find_classes_with_y_labels
from Utils.Utils import get_feature_sizes_and_edge_config, find_classes_with_y_labels
class GNN(Module):
class GNN(torch.nn.Module):
def __init__(self, data: HeteroData, hidden_channels: int = 256,
num_hidden_layers: int = 4) -> None:
"""Initialize the Heterogeneous Graph Neural Network.
......@@ -134,14 +135,17 @@ class GNN(Module):
# Multi-head pooling and classification for each labeled node type
final_out = {}
for node_type in self.label_nodes:
# Apply multi-head pooling
mean_pooled = self.pooling[node_type]['mean'](out_dict[node_type])
add_pooled = self.pooling[node_type]['add'](out_dict[node_type])
# Concatenate pooled features
pooled = torch.cat([mean_pooled, add_pooled], dim=1)
# Use global pooling functions
mean_pooled = global_mean_pool(out_dict[node_type])
add_pooled = global_add_pool(out_dict[node_type])
# Apply classification head
# Post-pooling transformations
mean_pooled = self.pooling[node_type]['mean'](mean_pooled)
add_pooled = self.pooling[node_type]['add'](add_pooled)
# Concatenate and classify
pooled = torch.cat([mean_pooled, add_pooled], dim=1)
final_out[node_type] = self.classifiers[node_type](pooled)
# Apply log softmax to outputs
......@@ -297,22 +301,22 @@ class GNN(Module):
def main():
"""Example usage with DBLP dataset"""
from torch_geometric.datasets import DBLP
from CustomDataset.Text.DBLP import DBLP
# Load DBLP dataset
dataset = DBLP(root='rawData/DBLP')
data = dataset[0]
dblp = DBLP(root='rawData/DBLP')
dataset = dblp.dataset
print("\nDataset Information:")
print(data)
print(dataset)
# Test configurations
num_layers_to_test = [2, 4, 6]
num_layers_to_test = [2]
all_results = []
for num_layers in num_layers_to_test:
print(f"\nTraining with {num_layers} hidden layers:")
model = GNN(data=data, num_hidden_layers=num_layers)
model = GNN(data=dataset, num_hidden_layers=num_layers)
metrics = model.train_model(epochs=300, show_progress=True)
# Calculate average metrics across node types
......
......@@ -11,7 +11,8 @@ from typing import Dict, Optional, List, Tuple
from torch_geometric.loader import DataLoader
from tabulate import tabulate
class GNN(Module):
class GNN(torch.nn.Module):
def __init__(self, dataset: Dataset, hidden_channels: int = 256,
num_hidden_layers: int = 4) -> None:
super(GNN, self).__init__()
......@@ -32,8 +33,10 @@ class GNN(Module):
self.bns.append(BatchNorm1d(hidden_channels))
# Multi-head pooling
self.lin_mean = Linear(hidden_channels, hidden_channels)
self.lin_add = Linear(hidden_channels, hidden_channels)
self.pooling = torch.nn.ModuleDict({
'mean': Linear(hidden_channels, hidden_channels),
'add': Linear(hidden_channels, hidden_channels)
})
# Classification layers
self.classifier = torch.nn.Sequential(
......@@ -72,8 +75,8 @@ class GNN(Module):
x_mean = global_mean_pool(x, batch)
x_add = global_add_pool(x, batch)
x_mean = self.lin_mean(x_mean)
x_add = self.lin_add(x_add)
x_mean = self.pooling['mean'](x_mean)
x_add = self.pooling['add'](x_add)
# Concatenate and classify
x = torch.cat([x_mean, x_add], dim=1)
......@@ -233,14 +236,13 @@ class GNN(Module):
def main():
"""Enhanced testing with improved configurations"""
from torch_geometric.datasets import BAMultiShapesDataset
# Load dataset
dataset = BAMultiShapesDataset(root='rawData/BAMultiShapes')
ms = MultiShape(root='rawData/BAMultiShapes')
dataset = ms.dataset
original_labels = [data.y.item() for data in dataset]
# Enhanced test configurations
num_layers_to_test = [2, 4, 6]
num_layers_to_test = [2]
all_results = []
# Test all configurations with improved logging
......
import os
from typing import Union
from rdflib import XSD, Graph, Literal, RDF, RDFS, Namespace, OWL
import torch
from torch_geometric.data import HeteroData, Data
......@@ -17,7 +19,10 @@ class BaseOWLConverter(ABC):
self._build()
print("Serializing Graph...")
start_time = time.time()
directory = os.path.dirname(self.owlGraphPath)
os.makedirs(directory, exist_ok=True)
self.graph.serialize(self.owlGraphPath, format="xml")
print(f"Graph serialized in {time.time() - start_time:.2f} seconds.")
@abstractmethod
......@@ -94,6 +99,7 @@ class SingleGraphOWLConverter(BaseOWLConverter):
# Builds OWL datatype properties (attributes) for each node type in the heterodata.
def _buildDataProperties(self):
classNamespace = Namespace(self.namespace)
xsdRange = XSD.boolean if self.create_data_properties_as_object else XSD.double
for node in self.dataset.node_types:
if "x" in self.dataset[node]:
n = self.dataset[node].x.size(1)
......@@ -104,7 +110,6 @@ class SingleGraphOWLConverter(BaseOWLConverter):
if self.create_data_properties_as_object:
propertyObjectPropertyName = "has_" + propertyObjectPropertyName
propertyObjectProperty = classNamespace[propertyObjectPropertyName]
xsdRange = XSD.boolean if self.create_data_properties_as_object else XSD.double
self.graph.add((propertyObjectProperty, RDF.type, OWL.DatatypeProperty))
self.graph.add((propertyObjectProperty, RDFS.domain, classNamespace[node]))
self.graph.add((propertyObjectProperty, RDFS.range, xsdRange))
......@@ -112,10 +117,13 @@ class SingleGraphOWLConverter(BaseOWLConverter):
# Add high-level concepts if available for this node type
if node in self.high_level_concepts:
for theme in self.high_level_concepts[node].get('themes', []):
theme_namespace = classNamespace[f'has_theme_{theme}']
propertyObjectPropertyName = f'has_theme_{theme}'
if self.create_data_properties_as_object:
propertyObjectPropertyName = propertyObjectPropertyName
theme_namespace = classNamespace[propertyObjectPropertyName]
self.graph.add((theme_namespace, RDF.type, OWL.DatatypeProperty))
self.graph.add((theme_namespace, RDFS.domain, classNamespace[node]))
self.graph.add((theme_namespace, RDFS.range, XSD.boolean))
self.graph.add((theme_namespace, RDFS.range, XSD.double))
def _buildObjectProperties(self):
classNamespace = Namespace(self.namespace)
......@@ -160,12 +168,26 @@ class SingleGraphOWLConverter(BaseOWLConverter):
if node_type in self.high_level_concepts:
high_level_concept = self.high_level_concepts[node_type]
presence_matrix = high_level_concept.presence_matrix # m x n array
themes = high_level_concept.themes # List of n themes
presence_matrix = high_level_concept["presence_matrix"] # m x n array
themes = high_level_concept["themes"] # List of n themes
for theme_idx, theme in enumerate(themes): # Iterate over columns (n columns)
if presence_matrix[row_idx, theme_idx] == 1: # Check if presence is 1
self.graph.add((newNode, classNamespace[f'has_theme_{theme}'], Literal(True)))
val = presence_matrix[row_idx, theme_idx].item()
if val != 0:
self.graph.add((newNode, classNamespace[f'has_theme_{theme}'], Literal(val)))
# if node_type in self.high_level_concepts:
# high_level_concept = self.high_level_concepts[node_type]
# presence_matrix = high_level_concept["presence_matrix"] # m x n array
# themes = high_level_concept["themes"] # List of n themes
# for row_idx, presence_matrix_row in enumerate(presence_matrix):
# for theme_idx, theme_value in enumerate(presence_matrix_row): # Iterate over columns (n columns)
# val = theme_value.item()
# if self.create_data_properties_as_object:
# val = True if val != 0 else False
# if self.add_false_values or val or val != 0:
# self.graph.add((newNode, classNamespace[f'has_theme_{themes[theme_idx]}'], Literal(val)))
if "num_nodes" in self.dataset[node_type]:
num_nodes = self.dataset[node_type].num_nodes
......@@ -387,7 +409,7 @@ class MultiGraphOWLConverter(BaseOWLConverter):
target_uri = ns[f'node_{idx}_{target.item()}']
self.graph.add((source_uri, ns.connectedTo, target_uri))
def convert_to_owl(data, namespace: str, owlGraphPath: str, high_level_concepts: dict = None, **kwargs):
def convert_to_owl(data: Union[HeteroData, Data], namespace: str, owlGraphPath: str, high_level_concepts: dict = None, **kwargs):
"""
Factory function to create the appropriate converter based on input data type
......@@ -409,8 +431,8 @@ def convert_to_owl(data, namespace: str, owlGraphPath: str, high_level_concepts:
if __name__ == "__main__":
# Example usage with MUTAG dataset
from customDBs.MUTAG import MUTAG
from customDBs.MultiShape import MultiShape
from CustomDataset.Structured.MUTAG import MUTAG
from CustomDataset.Structured.MultiShape import MultiShape
mutag = MUTAG(path='../rawData/MUTAG')
ms = MultiShape(path='./rawData/BAMultiShapes')
......
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment