# -*- coding: utf-8 -*-
"""
Copyright TorchKGE developers
@author: Armand Boschin <aboschin@enst.fr>
"""
from torch import empty, zeros, cat
from tqdm.autonotebook import tqdm
from .data_structures import SmallKG
from .exceptions import NotYetEvaluatedError
from .sampling import PositionalNegativeSampler
from .utils import DataLoader, get_rank, filter_scores
[docs]class RelationPredictionEvaluator(object):
"""Evaluate performance of given embedding using relation prediction method.
References
----------
* Armand Boschin, Thomas Bonald.
Enriching Wikidata with Semantified Wikipedia Hyperlinks
In proceedings of the Wikidata workshop, ISWC2021, 2021.
http://ceur-ws.org/Vol-2982/paper-6.pdf
Parameters
----------
model: torchkge.models.interfaces.Model
Embedding model inheriting from the right interface.
knowledge_graph: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the evaluation will be done.
directed: bool, optional (default=True)
Indicates whether the orientation head to tail is known when
predicting missing relations. If False, then both tests (h, _, t)
and (t, _, r) are done to find the best scoring triples.
Attributes
----------
model: torchkge.models.interfaces.Model
Embedding model inheriting from the right interface.
kg: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the evaluation will be done.
rank_true_rels: torch.Tensor, shape: (n_facts), dtype: `torch.int`
For each fact, this is the rank of the true relation when all relations
are ranked. They are ranked in decreasing order of scoring function
:math:`f_r(h,t)`.
filt_rank_true_rels: torch.Tensor, shape: (n_facts), dtype: `torch.int`
This is the same as the `rank_true_rels` when in the filtered
case. See referenced paper by Bordes et al. for more information.
evaluated: bool
Indicates if the method LinkPredictionEvaluator.evaluate has already
been called.
directed: bool, optional (default=True)
Indicates whether the orientation head to tail is known when
predicting missing relations. If False, then both tests (h, _, t)
and (t, _, r) are done to find the best scoring triples.
"""
def __init__(self, model, knowledge_graph, directed=True):
self.model = model
self.kg = knowledge_graph
self.directed = directed
self.rank_true_rels = empty(size=(knowledge_graph.n_facts,)).long()
self.filt_rank_true_rels = empty(size=(knowledge_graph.n_facts,)).long()
self.evaluated = False
def evaluate(self, b_size, verbose=True):
"""
Parameters
----------
b_size: int
Size of the current batch.
verbose: bool
Indicates whether a progress bar should be displayed during
evaluation.
"""
use_cuda = next(self.model.parameters()).is_cuda
if use_cuda:
dataloader = DataLoader(self.kg, batch_size=b_size, use_cuda='batch')
self.rank_true_rels = self.rank_true_rels.cuda()
self.filt_rank_true_rels = self.filt_rank_true_rels.cuda()
else:
dataloader = DataLoader(self.kg, batch_size=b_size)
for i, batch in tqdm(enumerate(dataloader), total=len(dataloader),
unit='batch', disable=(not verbose),
desc='Relation prediction evaluation'):
h_idx, t_idx, r_idx = batch[0], batch[1], batch[2]
h_emb, t_emb, r_emb, candidates = self.model.inference_prepare_candidates(h_idx, t_idx, r_idx, entities=False)
scores = self.model.inference_scoring_function(h_emb, t_emb, candidates)
filt_scores = filter_scores(scores, self.kg.dict_of_rels, h_idx, t_idx, r_idx)
if not self.directed:
scores_bis = self.model.inference_scoring_function(t_emb, h_emb, candidates)
filt_scores_bis = filter_scores(scores_bis, self.kg.dict_of_rels, h_idx, t_idx, r_idx)
scores = cat((scores, scores_bis), dim=1)
filt_scores = cat((filt_scores, filt_scores_bis), dim=1)
self.rank_true_rels[i * b_size: (i + 1) * b_size] = get_rank(scores, r_idx).detach()
self.filt_rank_true_rels[i * b_size: (i + 1) * b_size] = get_rank(filt_scores, r_idx).detach()
self.evaluated = True
if use_cuda:
self.rank_true_rels = self.rank_true_rels.cpu()
self.filt_rank_true_rels = self.filt_rank_true_rels.cpu()
def mean_rank(self):
"""
Returns
-------
mean_rank: float
Mean rank of the true entity when replacing alternatively head
and tail in any fact of the dataset.
filt_mean_rank: float
Filtered mean rank of the true entity when replacing
alternatively head and tail in any fact of the dataset.
"""
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
sum_ = self.rank_true_rels.float().mean().item()
filt_sum = self.filt_rank_true_rels.float().mean().item()
return sum_, filt_sum
def hit_at_k(self, k=10):
"""
Parameters
----------
k: int
Hit@k is the number of entities that show up in the top k that
give facts present in the dataset.
Returns
-------
avg_hitatk: float
Average of hit@k for head and tail replacement.
filt_avg_hitatk: float
Filtered average of hit@k for head and tail replacement.
"""
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
return (self.rank_true_rels <= k).float().mean().item(), (self.filt_rank_true_rels <= k).float().mean().item()
def mrr(self):
"""
Returns
-------
avg_mrr: float
Average of mean recovery rank for head and tail replacement.
filt_avg_mrr: float
Filtered average of mean recovery rank for head and tail
replacement.
"""
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
mrr = (self.rank_true_rels.float()**(-1)).mean()
filt_mrr = (self.filt_rank_true_rels.float()**(-1)).mean()
return mrr.item(), filt_mrr.item()
def print_results(self, k=None, n_digits=3):
"""
Parameters
----------
k: int or list
k (or list of k) such that hit@k will be printed.
n_digits: int
Number of digits to be printed for hit@k and MRR.
"""
if k is None:
k = 10
if k is not None and type(k) == int:
print('Hit@{} : {} \t\t Filt. Hit@{} : {}'.format(
k, round(self.hit_at_k(k=k)[0], n_digits),
k, round(self.hit_at_k(k=k)[1], n_digits)))
if k is not None and type(k) == list:
for i in k:
print('Hit@{} : {} \t\t Filt. Hit@{} : {}'.format(
i, round(self.hit_at_k(k=i)[0], n_digits),
i, round(self.hit_at_k(k=i)[1], n_digits)))
print('Mean Rank : {} \t Filt. Mean Rank : {}'.format(
int(self.mean_rank()[0]), int(self.mean_rank()[1])))
print('MRR : {} \t\t Filt. MRR : {}'.format(
round(self.mrr()[0], n_digits), round(self.mrr()[1], n_digits)))
[docs]class LinkPredictionEvaluator(object):
"""Evaluate performance of given embedding using link prediction method.
References
----------
* Antoine Bordes, Nicolas Usunier, Alberto Garcia-Duran, Jason Weston,
and Oksana Yakhnenko.
Translating Embeddings for Modeling Multi-relational Data.
In Advances in Neural Information Processing Systems 26, pages 2787–2795,
2013.
https://papers.nips.cc/paper/5071-translating-embeddings-for-modeling-multi-relational-data
Parameters
----------
model: torchkge.models.interfaces.Model
Embedding model inheriting from the right interface.
knowledge_graph: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the evaluation will be done.
Attributes
----------
model: torchkge.models.interfaces.Model
Embedding model inheriting from the right interface.
kg: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the evaluation will be done.
rank_true_heads: torch.Tensor, shape: (n_facts), dtype: `torch.int`
For each fact, this is the rank of the true head when all entities
are ranked as possible replacement of the head entity. They are
ranked in decreasing order of scoring function :math:`f_r(h,t)`.
rank_true_tails: torch.Tensor, shape: (n_facts), dtype: `torch.int`
For each fact, this is the rank of the true tail when all entities
are ranked as possible replacement of the tail entity. They are
ranked in decreasing order of scoring function :math:`f_r(h,t)`.
filt_rank_true_heads: torch.Tensor, shape: (n_facts), dtype: `torch.int`
This is the same as the `rank_of_true_heads` when in the filtered
case. See referenced paper by Bordes et al. for more information.
filt_rank_true_tails: torch.Tensor, shape: (n_facts), dtype: `torch.int`
This is the same as the `rank_of_true_tails` when in the filtered
case. See referenced paper by Bordes et al. for more information.
evaluated: bool
Indicates if the method LinkPredictionEvaluator.evaluate has already
been called.
"""
def __init__(self, model, knowledge_graph):
self.model = model
self.kg = knowledge_graph
self.rank_true_heads = empty(size=(knowledge_graph.n_facts,)).long()
self.rank_true_tails = empty(size=(knowledge_graph.n_facts,)).long()
self.filt_rank_true_heads = empty(size=(knowledge_graph.n_facts,)).long()
self.filt_rank_true_tails = empty(size=(knowledge_graph.n_facts,)).long()
self.evaluated = False
[docs] def evaluate(self, b_size, verbose=True):
"""
Parameters
----------
b_size: int
Size of the current batch.
verbose: bool
Indicates whether a progress bar should be displayed during
evaluation.
"""
use_cuda = next(self.model.parameters()).is_cuda
if use_cuda:
dataloader = DataLoader(self.kg, batch_size=b_size, use_cuda='batch')
self.rank_true_heads = self.rank_true_heads.cuda()
self.rank_true_tails = self.rank_true_tails.cuda()
self.filt_rank_true_heads = self.filt_rank_true_heads.cuda()
self.filt_rank_true_tails = self.filt_rank_true_tails.cuda()
else:
dataloader = DataLoader(self.kg, batch_size=b_size)
for i, batch in tqdm(enumerate(dataloader), total=len(dataloader),
unit='batch', disable=(not verbose),
desc='Link prediction evaluation'):
h_idx, t_idx, r_idx = batch[0], batch[1], batch[2]
h_emb, t_emb, r_emb, candidates = self.model.inference_prepare_candidates(h_idx, t_idx, r_idx, entities=True)
scores = self.model.inference_scoring_function(h_emb, candidates, r_emb)
filt_scores = filter_scores(scores, self.kg.dict_of_tails, h_idx, r_idx, t_idx)
self.rank_true_tails[i * b_size: (i + 1) * b_size] = get_rank(scores, t_idx).detach()
self.filt_rank_true_tails[i * b_size: (i + 1) * b_size] = get_rank(filt_scores, t_idx).detach()
scores = self.model.inference_scoring_function(candidates, t_emb, r_emb)
filt_scores = filter_scores(scores, self.kg.dict_of_heads, t_idx, r_idx, h_idx)
self.rank_true_heads[i * b_size: (i + 1) * b_size] = get_rank(scores, h_idx).detach()
self.filt_rank_true_heads[i * b_size: (i + 1) * b_size] = get_rank(filt_scores, h_idx).detach()
self.evaluated = True
if use_cuda:
self.rank_true_heads = self.rank_true_heads.cpu()
self.rank_true_tails = self.rank_true_tails.cpu()
self.filt_rank_true_heads = self.filt_rank_true_heads.cpu()
self.filt_rank_true_tails = self.filt_rank_true_tails.cpu()
[docs] def mean_rank(self):
"""
Returns
-------
mean_rank: float
Mean rank of the true entity when replacing alternatively head
and tail in any fact of the dataset.
filt_mean_rank: float
Filtered mean rank of the true entity when replacing
alternatively head and tail in any fact of the dataset.
"""
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
sum_ = (self.rank_true_heads.float().mean() +
self.rank_true_tails.float().mean()).item()
filt_sum = (self.filt_rank_true_heads.float().mean() +
self.filt_rank_true_tails.float().mean()).item()
return sum_ / 2, filt_sum / 2
def hit_at_k_heads(self, k=10):
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
head_hit = (self.rank_true_heads <= k).float().mean()
filt_head_hit = (self.filt_rank_true_heads <= k).float().mean()
return head_hit.item(), filt_head_hit.item()
def hit_at_k_tails(self, k=10):
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
tail_hit = (self.rank_true_tails <= k).float().mean()
filt_tail_hit = (self.filt_rank_true_tails <= k).float().mean()
return tail_hit.item(), filt_tail_hit.item()
[docs] def hit_at_k(self, k=10):
"""
Parameters
----------
k: int
Hit@k is the number of entities that show up in the top k that
give facts present in the dataset.
Returns
-------
avg_hitatk: float
Average of hit@k for head and tail replacement.
filt_avg_hitatk: float
Filtered average of hit@k for head and tail replacement.
"""
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
head_hit, filt_head_hit = self.hit_at_k_heads(k=k)
tail_hit, filt_tail_hit = self.hit_at_k_tails(k=k)
return (head_hit + tail_hit) / 2, (filt_head_hit + filt_tail_hit) / 2
[docs] def mrr(self):
"""
Returns
-------
avg_mrr: float
Average of mean recovery rank for head and tail replacement.
filt_avg_mrr: float
Filtered average of mean recovery rank for head and tail
replacement.
"""
if not self.evaluated:
raise NotYetEvaluatedError('Evaluator not evaluated call '
'LinkPredictionEvaluator.evaluate')
head_mrr = (self.rank_true_heads.float()**(-1)).mean()
tail_mrr = (self.rank_true_tails.float()**(-1)).mean()
filt_head_mrr = (self.filt_rank_true_heads.float()**(-1)).mean()
filt_tail_mrr = (self.filt_rank_true_tails.float()**(-1)).mean()
return ((head_mrr + tail_mrr).item() / 2,
(filt_head_mrr + filt_tail_mrr).item() / 2)
[docs] def print_results(self, k=None, n_digits=3):
"""
Parameters
----------
k: int or list
k (or list of k) such that hit@k will be printed.
n_digits: int
Number of digits to be printed for hit@k and MRR.
"""
if k is None:
k = 10
if k is not None and type(k) == int:
print('Hit@{} : {} \t\t Filt. Hit@{} : {}'.format(
k, round(self.hit_at_k(k=k)[0], n_digits),
k, round(self.hit_at_k(k=k)[1], n_digits)))
if k is not None and type(k) == list:
for i in k:
print('Hit@{} : {} \t\t Filt. Hit@{} : {}'.format(
i, round(self.hit_at_k(k=i)[0], n_digits),
i, round(self.hit_at_k(k=i)[1], n_digits)))
print('Mean Rank : {} \t Filt. Mean Rank : {}'.format(
int(self.mean_rank()[0]), int(self.mean_rank()[1])))
print('MRR : {} \t\t Filt. MRR : {}'.format(
round(self.mrr()[0], n_digits), round(self.mrr()[1], n_digits)))
[docs]class TripletClassificationEvaluator(object):
"""Evaluate performance of given embedding using triplet classification
method.
References
----------
* Richard Socher, Danqi Chen, Christopher D Manning, and Andrew Ng.
Reasoning With Neural Tensor Networks for Knowledge Base Completion.
In Advances in Neural Information Processing Systems 26, pages 926–934.
2013.
https://nlp.stanford.edu/pubs/SocherChenManningNg_NIPS2013.pdf
Parameters
----------
model: torchkge.models.interfaces.Model
Embedding model inheriting from the right interface.
kg_val: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the validation thresholds will be computed.
kg_test: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the testing evaluation will be done.
Attributes
----------
model: torchkge.models.interfaces.Model
Embedding model inheriting from the right interface.
kg_val: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the validation thresholds will be computed.
kg_test: torchkge.data_structures.KnowledgeGraph
Knowledge graph on which the evaluation will be done.
evaluated: bool
Indicate whether the `evaluate` function has been called.
thresholds: float
Value of the thresholds for the scoring function to consider a
triplet as true. It is defined by calling the `evaluate` method.
sampler: torchkge.sampling.NegativeSampler
Negative sampler.
"""
def __init__(self, model, kg_val, kg_test):
self.model = model
self.kg_val = kg_val
self.kg_test = kg_test
self.is_cuda = next(self.model.parameters()).is_cuda
self.evaluated = False
self.thresholds = None
self.sampler = PositionalNegativeSampler(self.kg_val,
kg_test=self.kg_test)
[docs] def get_scores(self, heads, tails, relations, batch_size):
"""With head, tail and relation indexes, compute the value of the
scoring function of the model.
Parameters
----------
heads: torch.Tensor, dtype: torch.long, shape: n_facts
List of heads indices.
tails: torch.Tensor, dtype: torch.long, shape: n_facts
List of tails indices.
relations: torch.Tensor, dtype: torch.long, shape: n_facts
List of relation indices.
batch_size: int
Returns
-------
scores: torch.Tensor, dtype: torch.float, shape: n_facts
List of scores of each triplet.
"""
scores = []
small_kg = SmallKG(heads, tails, relations)
if self.is_cuda:
dataloader = DataLoader(small_kg, batch_size=batch_size,
use_cuda='batch')
else:
dataloader = DataLoader(small_kg, batch_size=batch_size)
for i, batch in enumerate(dataloader):
h_idx, t_idx, r_idx = batch[0], batch[1], batch[2]
scores.append(self.model.scoring_function(h_idx, t_idx, r_idx))
return cat(scores, dim=0)
[docs] def evaluate(self, b_size):
"""Find relation thresholds using the validation set. As described in
the paper by Socher et al., for a relation, the threshold is a value t
such that if the score of a triplet is larger than t, the fact is true.
If a relation is not present in any fact of the validation set, then
the largest value score of all negative samples is used as threshold.
Parameters
----------
b_size: int
Batch size.
"""
r_idx = self.kg_val.relations
neg_heads, neg_tails = self.sampler.corrupt_kg(b_size, self.is_cuda,
which='main')
neg_scores = self.get_scores(neg_heads, neg_tails, r_idx, b_size)
self.thresholds = zeros(self.kg_val.n_rel)
for i in range(self.kg_val.n_rel):
mask = (r_idx == i).bool()
if mask.sum() > 0:
self.thresholds[i] = neg_scores[mask].max()
else:
self.thresholds[i] = neg_scores.max()
self.evaluated = True
self.thresholds.detach_()
[docs] def accuracy(self, b_size):
"""
Parameters
----------
b_size: int
Batch size.
Returns
-------
acc: float
Share of all triplets (true and negatively sampled ones) that where
correctly classified using the thresholds learned from the
validation set.
"""
if not self.evaluated:
self.evaluate(b_size)
r_idx = self.kg_test.relations
neg_heads, neg_tails = self.sampler.corrupt_kg(b_size,
self.is_cuda,
which='test')
scores = self.get_scores(self.kg_test.head_idx,
self.kg_test.tail_idx,
r_idx,
b_size)
neg_scores = self.get_scores(neg_heads, neg_tails, r_idx, b_size)
if self.is_cuda:
self.thresholds = self.thresholds.cuda()
scores = (scores > self.thresholds[r_idx])
neg_scores = (neg_scores < self.thresholds[r_idx])
return (scores.sum().item() +
neg_scores.sum().item()) / (2 * self.kg_test.n_facts)