Source code for besskge.dataset

# Copyright (c) 2023 Graphcore Ltd. All rights reserved.

"""
Utilities for building and storing knowledge graph datasets
as collections of (h,r,t) triples.
"""

import dataclasses
import pickle
import tarfile
import zipfile
from io import BytesIO
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import ogb.linkproppred
import pandas as pd
import requests
from numpy.typing import NDArray


[docs]@dataclasses.dataclass class KGDataset: """ Represents a complete knowledge graph dataset of (head, relation, tail) triples. """ #: Number of entities (nodes) in the knowledge graph n_entity: int #: Number of relation types (edge labels) in the knowledge graph n_relation_type: int #: List of (h_ID, r_ID, t_ID) triples, for each part of the dataset; #: {part: int32[n_triple, {h,r,t}]} triples: Dict[str, NDArray[np.int32]] #: IDs of the triples in KGDataset.triples wrt #: the ordering in the original array/dataframe #: from where the triples originate. original_triple_ids: Dict[str, NDArray[np.int32]] #: Entity labels by ID; str[n_entity] entity_dict: Optional[List[str]] = None #: Relation type labels by ID; str[n_relation_type] relation_dict: Optional[List[str]] = None #: If entities have types, IDs are assumed to be clustered by type; #: {entity_type: int} type_offsets: Optional[Dict[str, int]] = None #: IDs of (possibly triple-specific) negative heads; #: {part: int32[n_triple or 1, n_neg_heads]} neg_heads: Optional[Dict[str, NDArray[np.int32]]] = None #: IDs of (possibly triple-specific) negative tails; #: {part: int32[n_triple or 1, n_neg_tails]} neg_tails: Optional[Dict[str, NDArray[np.int32]]] = None @property def ht_types(self) -> Optional[Dict[str, NDArray[np.int32]]]: """ If entities have types, type IDs of triples' heads/tails; {part: int32[n_triple, {h_type, t_type}]} """ if self.type_offsets: type_offsets = np.fromiter(self.type_offsets.values(), dtype=np.int32) types = {} for part, triple in self.triples.items(): types[part] = ( np.digitize( triple[:, [0, 2]], type_offsets, ) - 1 ) return types else: return None
[docs] @classmethod def from_triples( cls, data: NDArray[np.int32], split: Tuple[float, float, float] = (0.7, 0.15, 0.15), seed: int = 1234, entity_dict: Optional[List[str]] = None, relation_dict: Optional[List[str]] = None, type_offsets: Optional[Dict[str, int]] = None, ) -> "KGDataset": """ Build a dataset from an array of triples, where IDs for entities and relations have already been assigned. Note that, if entities have types, entities of the same type need to have contiguous IDs. Triples are randomly split in train/validation/test sets. The attribute `KGDataset.original_triple_ids` stores the IDs of the triples in each split wrt the original ordering in `data`. If a pre-defined train/validation/test split is wanted, the KGDataset class should be instantiated manually. :param data: Numpy array of triples [head_id, relation_id, tail_id]. Shape (num_triples, 3). :param split: Tuple to set the train/validation/test split. :param seed: Random seed for the train/validation/test split. :param entity_dict: Optional entity labels by ID. :param relation_dict: Optional relation labels by ID. :param type_offsets: Offset of entity types :return: Instance of the KGDataset class. """ num_triples = data.shape[0] num_train = int(num_triples * split[0]) num_valid = int(num_triples * split[1]) rng = np.random.default_rng(seed=seed) id_shuffle = rng.permutation(np.arange(num_triples)) triple_ids = dict() triple_ids["train"], triple_ids["valid"], triple_ids["test"] = np.split( id_shuffle, (num_train, num_train + num_valid), axis=0 ) triples = dict() triples["train"] = data[triple_ids["train"]] triples["valid"] = data[triple_ids["valid"]] triples["test"] = data[triple_ids["test"]] ds = cls( n_entity=data[:, [0, 2]].max() + 1, n_relation_type=data[:, 1].max() + 1, entity_dict=entity_dict, relation_dict=relation_dict, type_offsets=type_offsets, triples=triples, original_triple_ids=triple_ids, ) return ds
[docs] @classmethod def from_dataframe( cls, df: Union[pd.DataFrame, Dict[str, pd.DataFrame]], head_column: Union[int, str], relation_column: Union[int, str], tail_column: Union[int, str], entity_types: Optional[Union[pd.Series, Dict[str, str]]] = None, # type: ignore split: Tuple[float, float, float] = (0.7, 0.15, 0.15), seed: int = 1234, ) -> "KGDataset": """ Build a KGDataset from a pandas DataFrame of labeled (h,r,t) triples. IDs for entities and relations are automatically assigned based on labels in such a way that entities of the same type have contiguous IDs. :param df: Pandas DataFrame of all triples in the knowledge graph dataset, or dictionary of DataFrames of triples for each part of the dataset split :param head_column: Name of the DataFrame column storing head entities :param relation_column: Name of the DataFrame column storing relations :param tail_column: Name of the DataFrame column storing tail entities :param entity_types: If entities have types, dictionary or pandas Series of mappings entity label -> entity type (as strings). :param split: Tuple to set the train/validation/test split. Only used if no pre-defined dataset split is specified, i.e. if `df` is not a dictionary. :param seed: Random seed for the train/validation/test split. Only used if no pre-defined dataset split is specified, i.e. if `df` is not a dictionary. :return: Instance of the KGDataset class. """ df_dict = {"all": df} if isinstance(df, pd.DataFrame) else df unique_ent = pd.concat( [ pd.concat([dfp[head_column], dfp[tail_column]]) for dfp in df_dict.values() ] ).unique() ent2id = pd.Series(np.arange(len(unique_ent)), index=unique_ent, name="ent_id") unique_rel = pd.concat( [dfp[relation_column] for dfp in df_dict.values()] ).unique() rel2id = pd.Series(np.arange(len(unique_rel)), index=unique_rel, name="rel_id") if entity_types is not None: ent2type = pd.Series(entity_types, name="ent_type") ent2id_type = pd.merge( ent2id, ent2type, how="left", left_index=True, right_index=True ).sort_values("ent_type") ent2id.index = ent2id_type.index type_off = ( ent2id_type.groupby("ent_type")["ent_type"].count().cumsum().shift(1) ) type_off.iloc[0] = 0 type_offsets = type_off.astype("int64").to_dict() else: type_offsets = None entity_dict = ent2id.index.tolist() relation_dict = rel2id.index.tolist() triples = {} for part, dfp in df_dict.items(): heads = dfp[head_column].map(ent2id).values.astype(np.int32) tails = dfp[tail_column].map(ent2id).values.astype(np.int32) rels = dfp[relation_column].map(rel2id).values.astype(np.int32) triples[part] = np.stack([heads, rels, tails], axis=1) if isinstance(df, pd.DataFrame): return KGDataset.from_triples( triples["all"], split, seed, entity_dict, relation_dict, type_offsets ) else: return cls( n_entity=len(entity_dict), n_relation_type=len(relation_dict), entity_dict=entity_dict, relation_dict=relation_dict, type_offsets=type_offsets, triples=triples, original_triple_ids={ k: np.arange(v.shape[0]) for k, v in triples.items() }, )
[docs] @classmethod def build_ogbl_biokg(cls, root: Path) -> "KGDataset": """ Build the ogbl-biokg dataset :cite:p:`OGB` .. seealso:: https://ogb.stanford.edu/docs/linkprop/#ogbl-biokg :param root: Local path to the dataset. If the dataset is not present in this location, then it is downloaded and stored here. :return: The ogbl-biokg KGDataset. """ dataset = ogb.linkproppred.LinkPropPredDataset(name="ogbl-biokg", root=root) split_edge = dataset.get_edge_split() n_relation_type = len(dataset[0]["edge_reltype"].keys()) type_counts = dataset[0]["num_nodes_dict"] type_offsets = np.concatenate( ([0], np.cumsum(np.fromiter(type_counts.values(), dtype=int))) ) n_entity = type_offsets[-1] type_offsets = dict(zip(type_counts.keys(), type_offsets)) triples = {} neg_heads = {} neg_tails = {} for part, hrt in split_edge.items(): h_label, h_type_idx = np.unique(hrt["head_type"], return_inverse=True) t_label, t_type_idx = np.unique(hrt["tail_type"], return_inverse=True) h_type_offsets = np.array([type_offsets[lab] for lab in h_label]) t_type_offsets = np.array([type_offsets[lab] for lab in t_label]) hrt["head"] += h_type_offsets[h_type_idx] hrt["tail"] += t_type_offsets[t_type_idx] triples[part] = np.stack( [hrt["head"], hrt["relation"], hrt["tail"]], axis=-1 ) if part != "train": neg_heads[part] = hrt["head_neg"] + h_type_offsets[h_type_idx][:, None] neg_tails[part] = hrt["tail_neg"] + t_type_offsets[t_type_idx][:, None] ent_dict: List[str] = [] for k in type_offsets.keys(): ent_dict.extend( pd.read_csv(root.joinpath(f"ogbl_biokg/mapping/{k}_entidx2name.csv.gz")) .sort_values("ent idx")["ent name"] .values.tolist() ) rel_dict = ( pd.read_csv(root.joinpath("ogbl_biokg/mapping/relidx2relname.csv.gz")) .sort_values("rel idx")["rel name"] .values.tolist() ) return cls( n_entity=n_entity, n_relation_type=n_relation_type, entity_dict=ent_dict, relation_dict=rel_dict, type_offsets=type_offsets, triples=triples, original_triple_ids={k: np.arange(v.shape[0]) for k, v in triples.items()}, neg_heads=neg_heads, neg_tails=neg_tails, )
[docs] @classmethod def build_ogbl_wikikg2(cls, root: Path) -> "KGDataset": """ Build the ogbl-wikikg2 dataset :cite:p:`OGB` .. seealso:: https://ogb.stanford.edu/docs/linkprop/#ogbl-wikikg2 :param root: Local path to the dataset. If the dataset is not present in this location, then it is downloaded and stored here. :return: The ogbl-wikikg2 KGDataset. """ dataset = ogb.linkproppred.LinkPropPredDataset(name="ogbl-wikikg2", root=root) split_data = dataset.get_edge_split() triples = {} neg_heads = {} neg_tails = {} for part, hrt in split_data.items(): triples[part] = np.stack( [hrt["head"], hrt["relation"], hrt["tail"]], axis=-1 ) if part != "train": neg_heads[part] = hrt["head_neg"] neg_tails[part] = hrt["tail_neg"] ent_dict = ( pd.read_csv(root.joinpath("ogbl_wikikg2/mapping/nodeidx2entityid.csv.gz")) .sort_values("node idx")["entity id"] .values.tolist() ) rel_dict = ( pd.read_csv(root.joinpath("ogbl_wikikg2/mapping/reltype2relid.csv.gz")) .sort_values("reltype")["rel id"] .values.tolist() ) return cls( n_entity=dataset.graph["num_nodes"], n_relation_type=split_data["train"]["relation"].max() + 1, entity_dict=ent_dict, relation_dict=rel_dict, type_offsets=None, triples=triples, original_triple_ids={k: np.arange(v.shape[0]) for k, v in triples.items()}, neg_heads=neg_heads, neg_tails=neg_tails, )
[docs] @classmethod def build_yago310(cls, root: Path) -> "KGDataset": """ Build the YAGO3-10 dataset. This is the subgraph of the YAGO3 knowledge graph :cite:p:`YAGO3` containing only entities which have at least 10 relations associated to them. First used in :cite:p:`ConvE`. .. seealso:: https://yago-knowledge.org/downloads/yago-3 :param root: Local path to the dataset. If the dataset is not present in this location, then it is downloaded and stored here. :return: The YAGO3-10 KGDataset. """ if not ( root.joinpath("train.txt").is_file() and root.joinpath("valid.txt").is_file() and root.joinpath("test.txt").is_file() ): print("Downloading dataset...") res = requests.get( url="https://github.com/TimDettmers/ConvE/raw/master/YAGO3-10.tar.gz" ) with tarfile.open(fileobj=BytesIO(res.content)) as tarf: tarf.extractall(path=root) train_triples = pd.read_csv( root.joinpath("train.txt"), delimiter="\t", dtype=str, header=None ) valid_triples = pd.read_csv( root.joinpath("valid.txt"), delimiter="\t", dtype=str, header=None ) test_triples = pd.read_csv( root.joinpath("test.txt"), delimiter="\t", dtype=str, header=None ) return cls.from_dataframe( {"train": train_triples, "valid": valid_triples, "test": test_triples}, head_column=0, relation_column=1, tail_column=2, )
[docs] def save(self, out_file: Path) -> None: """ Save dataset to .pkl. :param out_file: Path to output file. """ with open(out_file, "wb") as f: pickle.dump(self, f) print(f"KGDataset saved to {out_file}")
[docs] @classmethod def load(cls, path: Path) -> "KGDataset": """ Load a :class:`KGDataset` object saved with :func:`KGDataset.save`. :param path: Path to saved :class:`KGDataset` object. :return: The saved :class:`KGDataset` object. """ kg_dataset: KGDataset with open(path, "rb") as f: kg_dataset = pickle.load(f) if not isinstance(kg_dataset, KGDataset): raise ValueError(f"File at path {path} is not a KGDataset") print(f"Loaded KGDataset at {path}") return kg_dataset