Skip to content
Snippets Groups Projects
Commit b421fe46 authored by AjUm-HEIDI's avatar AjUm-HEIDI
Browse files

Add conditions to summarize the results

parent 0a7136bb
No related branches found
No related tags found
No related merge requests found
......@@ -36,6 +36,8 @@ class LinearPressureFitness(AbstractFitness):
quality = individual.quality.values[0]
fitness = self.gain*quality - self.penalty*len(individual)
# print(self.gain, quality, self.gain*quality, len(individual))
individual.fitness.values = (round(fitness, 5),)
class DiscriminativeExplainer:
......@@ -125,7 +127,7 @@ class DiscriminativeExplainer:
max_runtime: Optional[int] = 60,
num_generations: Optional[int] = 600,
quality_func: Optional[AbstractScorer] = None,
length_penalty: Optional[int] = 0.5) -> OWLClassExpression:
length_penalty: Optional[int] = 1.0) -> OWLClassExpression:
"""Explains based on the GNN a given label. The explanation is in the form of a Class Expression.
Args:
......@@ -198,12 +200,12 @@ class DiscriminativeExplainer:
positive_examples.append(node)
else:
negative_examples.append(node)
else:
if "x" in self.data[node_type]:
noOfNodes = self.data[node_type].x.size()[0]
for idx in range(noOfNodes):
node = f"{self.namespace}{node_type}#{idx+1}"
negative_examples.append(node)
# else:
# if "x" in self.data[node_type]:
# noOfNodes = self.data[node_type].x.size()[0]
# for idx in range(noOfNodes):
# node = f"{self.namespace}{node_type}#{idx+1}"
# negative_examples.append(node)
if len(positive_examples) == 0:
return [OWLNothing]
......@@ -212,7 +214,8 @@ class DiscriminativeExplainer:
typed_neg = set(map(OWLNamedIndividual, map(IRI.create, set(negative_examples))))
lp = PosNegLPStandard(pos=typed_pos, neg=typed_neg)
print(len(positive_examples), len(negative_examples))
print("Positive Examples: ", len(positive_examples))
print("\nNegative Examples: ", len(negative_examples))
accepted_hypotheses = []
# cross check if the quality stated is within the threshold of the actual quality
while True:
......
......@@ -500,7 +500,7 @@ def get_feature_sizes_and_edge_config(data: HeteroData):
num_nodes = node_store.num_nodes
if num_nodes is not None and num_nodes > 0:
# Create a 1D dummy feature
dummy_feat = torch.ones((num_nodes, 1), dtype=torch.float)
dummy_feat = torch.zeros((num_nodes, 1), dtype=torch.float)
data[node_type].x = dummy_feat
feature_sizes[node_type] = 1
print(f"Created dummy feature for '{node_type}' with shape {dummy_feat.shape}")
......
import argparse
import json
from pathlib import Path
from structured_datasets_experiment import experiment as structured_datasets_experiment # Make sure to import your experiment function correctly
from text_based_datasets_experiment import experiment as text_based_datasets_experiment # Make sure to import your experiment function correctly
from structured_datasets_experiment import experiment as structured_datasets_experiment
from text_based_datasets_experiment import experiment as text_based_datasets_experiment
def load_config(config_path):
with open(config_path, 'r') as file:
......@@ -11,25 +11,67 @@ def load_config(config_path):
def parse_arguments():
parser = argparse.ArgumentParser(description="Run experiments for structured or text datasets.")
parser.add_argument("-c", "--config", default="config.json", help="Path to the configuration file, defaults to 'config.json'")
parser.add_argument("-t", "--type", choices=['structured', 'text'], default='structured', help="Type of dataset, defaults to 'structured'")
parser.add_argument("-i", "--iterations", type=int, default=5, help="The total number of times to run the experiment")
parser.add_argument("-t", "--type", choices=['text', 'structured'], help="Type of dataset, defaults to 'structured'")
parser.add_argument("-d", "--dataset", help="Specific dataset name to run, optional")
return parser.parse_args()
parser.add_argument("-n", "--num_groups", type=int, nargs='*', default=None, help="List of group sizes to run experiments with, space-separated")
parser.add_argument("-b", "--boolean_concepts", type=str, choices=['true', 'false'], default=None, help="Whether to create high level concepts as boolean values, must explicitly state 'true' or 'false'")
args = parser.parse_args()
# Convert boolean_concepts argument from string to actual Boolean type, or None if not specified
if args.boolean_concepts is not None:
args.boolean_concepts = True if args.boolean_concepts.lower() == 'true' else False
# Check if the iterations argument is a positive integer
if args.iterations < 1:
parser.error("The number of iterations must be a positive integer.")
return args
def main():
args = parse_arguments()
config = load_config(args.config)
datasets = config[args.type] # Load datasets based on type (structured or text)
# Decide which types to run based on the argument passed
types_to_run = []
if not args.type:
types_to_run = ['text', 'structured']
else:
types_to_run.append(args.type)
for dataset_type in types_to_run:
datasets = config[dataset_type] # Load datasets based on type (structured or text)
# If a specific dataset is specified, filter to only run that one
if args.dataset:
datasets = [d for d in datasets if d['datasetName'] == args.dataset]
for dataset in datasets:
if args.type == "structured":
structured_datasets_experiment(dataset["datasetName"])
elif args.type == "text":
text_based_datasets_experiment(dataset["grouped_keyword_dir"], dataset["datasetName"], dataset["entity_name"])
# Extract additional parameters from configuration or use defaults
bag_of_words_size = dataset.get('bag_of_words_size', 1000) # Default value if not specified
num_groups_list = args.num_groups if args.num_groups is not None else [i * 5 for i in range(6)]
# Pass the command-line argument for creating boolean concepts
create_high_level_concepts_as_boolean = args.boolean_concepts
if dataset_type == "structured":
structured_datasets_experiment(
dataset["datasetName"],
iterations=args.iterations,
bag_of_words_size=bag_of_words_size,
num_groups_list=num_groups_list,
create_high_level_concepts_as_boolean=create_high_level_concepts_as_boolean
)
elif dataset_type == "text":
text_based_datasets_experiment(
dataset["grouped_keyword_dir"],
dataset["datasetName"],
dataset["entity_name"],
args.iterations,
bag_of_words_size=bag_of_words_size,
num_groups_list=num_groups_list,
create_high_level_concepts_as_boolean=create_high_level_concepts_as_boolean
)
if __name__ == "__main__":
main()
from datetime import datetime
import time
import json
import os
import csv
......@@ -60,7 +61,7 @@ def load_datasets(dataset_name, bag_of_words_size) -> Base:
def fetch_high_level_concepts(dataset: Base, num_groups, group_keyword_file):
return dataset.fetch_themes(num_groups, group_keyword_file)
def append_to_csv_file(results, filename, dataset_key, num_groups, write_header=False):
def append_to_csv_file(results, filename, dataset_key, num_groups, create_high_level_concepts_as_boolean, write_header=False):
"""
Appends results to a CSV file. Creates the file if it doesn't exist.
If `write_header` is True, it will write the header.
......@@ -68,8 +69,8 @@ def append_to_csv_file(results, filename, dataset_key, num_groups, write_header=
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, mode='a', newline='', encoding='utf-8') as csvfile:
fieldnames = ['Dataset', 'Number of Groups', 'Length', 'Label Name',
'Hypothesis', 'Accuracy', 'Recall', 'Precision', 'F1 Score', 'High Level Concepts As Boolean']
fieldnames = ['Dataset', 'Number of Groups', 'Create High Level Concepts As Boolean', 'Label Name',
'Hypothesis', 'Accuracy', 'Recall', 'Precision', 'F1 Score', 'Length', 'Explain Time']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if write_header:
......@@ -79,14 +80,15 @@ def append_to_csv_file(results, filename, dataset_key, num_groups, write_header=
writer.writerow({
'Dataset': dataset_key,
'Number of Groups': num_groups,
'Length': data['length'],
'Create High Level Concepts As Boolean': create_high_level_concepts_as_boolean,
'Label Name': data['label_name'],
'Hypothesis': data['hypothesis'],
'Accuracy': data['evaluation'].get('Accuracy', 'N/A'),
'Recall': data['evaluation'].get('Recall', 'N/A'),
'Precision': data['evaluation'].get('Precision', 'N/A'),
'F1 Score': data['evaluation'].get('F1', 'N/A'),
"High Level Concepts As Boolean": data['evaluation'].get('high_level_concepts_as_boolean', 'N/A')
'Length': data['length'],
'Explain Time': data['explain_time']
})
print(f"Results appended to {filename}")
......@@ -107,21 +109,19 @@ def explain_and_evaluate(model, dataset, entity_name, owl_graph_path, high_level
all_results = {}
for label, label_name in enumerate(dataset[entity_name].yLabel):
start_time = time.time() # Start timing for each explanation
print(f"\nExplaining {entity_name} label {label}: {label_name}")
hypotheses, model = explainer.explain(
label, 5, debug=False, max_runtime=90,
num_generations=750, use_data_properties=True
)
for hypothesis in hypotheses:
print(renderer.render(hypothesis.concept), hypothesis.quality)
hypotheses, model = explainer.explain(label, 5, max_runtime=90, num_generations=750)
explain_time = time.time() - start_time # End timing for each explanation
best_hypothesis = hypotheses[0].concept
concept_string = renderer.render(best_hypothesis)
print(f"Best hypothesis: {concept_string}, Quality: {hypotheses[0].quality}")
print(f"\nBest hypothesis: {concept_string}, Quality: {hypotheses[0].quality}")
metrics = [Accuracy(), Recall(), Precision(), F1()]
evaluation = {}
evaluation["High Level Concepts As Boolean"] = create_high_level_concepts_as_boolean
for metric in metrics:
evaluated_concept = model.kb.evaluate_concept(
best_hypothesis, metric, model._learning_problem
......@@ -136,43 +136,51 @@ def explain_and_evaluate(model, dataset, entity_name, owl_graph_path, high_level
'hypothesis': concept_string,
'evaluation': evaluation,
"length": hypotheses[0].len,
"high_level_concepts_as_boolean": create_high_level_concepts_as_boolean
'explain_time': explain_time
}
return all_results
def summarize_aggregated_results(aggregated_results, summary_filename):
"""
Summarizes the aggregated results, including for each label:
Summarizes the aggregated results, including for each label and number of groups:
- Best Hypothesis
- Best F1 Score
- Least F1 Score
- Average F1 Score
- Best Accuracy
- Average Accuracy
- Length at Best F1
- Number of Groups used at best F1
And writes the summary to a CSV file.
- Average Length
- Average Explain Time
"""
os.makedirs(os.path.dirname(summary_filename), exist_ok=True)
with open(summary_filename, mode="w", newline="", encoding="utf-8") as csvfile:
fieldnames = [
"Label Name", "Best Hypothesis", "Best F1 Score", "Least F1 Score",
"Average F1 Score", "Length at Best F1", "Number of Groups"
"Label Name", "Number of Groups", "Best Hypothesis", "Best F1 Score",
"Average F1 Score", "Best Accuracy", "Average Accuracy", "Length at Best F1",
"Average Length", "Average Explain Time"
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for label, data in aggregated_results.items():
f1_scores = data["all_scores"]["F1"]
avg_f1 = sum(f1_scores) / len(f1_scores) if f1_scores else 0
for (label, num_groups), data in aggregated_results.items():
avg_f1 = sum(data["all_f1_scores"]) / len(data["all_f1_scores"]) if data["all_f1_scores"] else 0
avg_accuracy = sum(data["all_accuracies"]) / len(data["all_accuracies"]) if data["all_accuracies"] else 0
avg_length = sum(data["all_lengths"]) / len(data["all_lengths"]) if data["all_lengths"] else 0
avg_time = sum(data["all_times"]) / len(data["all_times"]) if data["all_times"] else 0
writer.writerow({
"Label Name": data["label_name"],
"Number of Groups": num_groups,
"Best Hypothesis": data["best_hypothesis"],
"Best F1 Score": data["best_F1"],
"Least F1 Score": min(f1_scores) if f1_scores else "N/A",
"Average F1 Score": avg_f1,
"Length at Best F1": data.get("length_at_best_f1", "N/A"),
"Number of Groups": data.get("num_groups_at_best_f1", 0)
"Best Accuracy": data["best_accuracy"],
"Average Accuracy": avg_accuracy,
"Length at Best F1": data["length_at_best_f1"],
"Average Length": avg_length,
"Average Explain Time": avg_time
})
print(f"Summary results saved to {summary_filename}")
......@@ -222,28 +230,29 @@ def experiment(grouped_keyword_dir, dataset_name, entity_name, bag_of_words_size
append_to_csv_file(results, run_csv_filename, dataset_name, num_groups, write_header=write_header)
for label, data in results.items():
# Initialize aggregation for this label if not yet present
if label not in aggregated_results:
aggregated_results[label] = {
# Initialize aggregation for this label and number of groups if not yet present
key = (label, num_groups)
if key not in aggregated_results:
aggregated_results[key] = {
"label_name": data["label_name"],
"best_hypothesis": data["hypothesis"],
"best_F1": data["evaluation"]["F1"],
"length_at_best_f1": data["length"],
"num_groups_at_best_f1": num_groups,
"all_scores": {"F1": [], "Accuracy": [], "Recall": [], "Precision": [], "Length": []}
"all_f1_scores": [],
"all_accuracies": [],
"all_lengths": [],
"all_times": []
}
else:
aggregated_data = aggregated_results[key]
# If the current F1 is better than the stored best, update best values
if data["evaluation"]["F1"] > aggregated_results[label]["best_F1"]:
aggregated_results[label]["best_F1"] = data["evaluation"]["F1"]
aggregated_results[label]["best_hypothesis"] = data["hypothesis"]
aggregated_results[label]["length_at_best_f1"] = data["length"]
aggregated_results[label]["num_groups_at_best_f1"] = num_groups
for metric, value in data["evaluation"].items():
aggregated_results[label]["all_scores"][metric].append(value)
aggregated_results[label]["all_scores"]["Length"].append(data["length"])
if data["evaluation"]["F1"] > aggregated_data["best_F1"]:
aggregated_data["best_F1"] = data["evaluation"]["F1"]
aggregated_data["best_hypothesis"] = data["hypothesis"]
aggregated_data["length_at_best_f1"] = data["length"]
aggregated_data["all_f1_scores"].append(data["evaluation"]["F1"])
aggregated_data["all_accuracies"].append(data["evaluation"]["Accuracy"])
aggregated_data["all_lengths"].append(data["length"])
aggregated_data["all_times"].append(data["explain_time"])
write_header = False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment