Source code for kg_topology_toolbox.topology_toolbox

# -*- coding: utf-8 -*-
# Copyright (c) 2023 Graphcore Ltd. All rights reserved.

"""
Topology toolbox main functionalities
"""

from functools import cache

import numpy as np
import pandas as pd
from scipy.sparse import coo_array

from kg_topology_toolbox.utils import (
    aggregate_by_relation,
    check_kg_df_structure,
    composition_count,
    jaccard_similarity,
    node_degrees_and_rels,
)


[docs] class KGTopologyToolbox: """ Toolbox class to compute Knowledge Graph topology statistics. """ def __init__( self, kg_df: pd.DataFrame, head_column: str = "h", relation_column: str = "r", tail_column: str = "t", ): """ Instantiate the Topology Toolbox for a Knowledge Graph defined by the list of its edges (h,r,t). :param kg_df: A Knowledge Graph represented as a pd.DataFrame. Must contain at least three columns, which specify the IDs of head entity, relation type and tail entity for each edge. :param head_column: The name of the column with the IDs of head entities. Default: "h". :param relation_column: The name of the column with the IDs of relation types. Default: "r". :param tail_column: The name of the column with the IDs of tail entities. Default: "t". """ check_kg_df_structure(kg_df, head_column, relation_column, tail_column) self.df = kg_df[[head_column, relation_column, tail_column]].rename( columns={head_column: "h", relation_column: "r", tail_column: "t"} ) self.n_entity = self.df[["h", "t"]].max().max() + 1 self.n_rel = self.df.r.max() + 1
[docs] def loop_count(self) -> pd.DataFrame: """ For each entity in the KG, compute the number of loops around the entity (i.e., the number of edges having the entity as both head and tail). :return: Loop count DataFrame, indexed on the IDs of the graph entities. """ n_loops = ( self.df[self.df.h == self.df.t].groupby("h").agg(n_loops=("r", "count")) ) return ( pd.DataFrame(n_loops, index=np.arange(self.n_entity)).fillna(0).astype(int) )
[docs] @cache def node_head_degree(self, return_relation_list: bool = False) -> pd.DataFrame: """ For each entity in the KG, compute the number of edges having it as head (head-degree, or out-degree of the head node). The relation types going out of the head node are also identified. :param return_relation_list: If True, return the list of unique relations going out of the head node. WARNING: expensive for large graphs. Default: False. :return: The result DataFrame, indexed on the IDs `e` of the graph entities, with columns: - **h_degree** (int): Number of triples with head entity `e`. - **h_unique_rel** (int): Number of distinct relation types among edges with head entity `e`. - **h_rel_list** (Optional[list]): List of unique relation types among edges with head entity `e`. Only returned if `return_relation_list = True`. """ node_df = node_degrees_and_rels( self.df, "h", self.n_entity, return_relation_list ) return node_df.rename(columns={n: "h_" + n for n in node_df.columns})
[docs] @cache def node_tail_degree(self, return_relation_list: bool = False) -> pd.DataFrame: """ For each entity in the KG, compute the number of edges having it as tail (tail-degree, or in-degree of the tail node). The relation types going into the tail node are also identified. :param return_relation_list: If True, return the list of unique relation types going into the tail node. WARNING: expensive for large graphs. Default: False. :return: The result DataFrame, indexed on the IDs `e` of the graph entities, with columns: - **t_degree** (int): Number of triples with tail entity `e`. - **t_unique_rel** (int): Number of distinct relation types among edges with tail entity `e`. - **t_rel_list** (Optional[list]): List of unique relation types among edges with tail entity `e`. Only returned if `return_relation_list = True`. """ node_df = node_degrees_and_rels( self.df, "t", self.n_entity, return_relation_list ) return node_df.rename(columns={n: "t_" + n for n in node_df.columns})
[docs] def node_degree_summary(self, return_relation_list: bool = False) -> pd.DataFrame: """ For each entity in the KG, compute the number of edges having it as a head (head-degree, or out-degree), as a tail (tail-degree, or in-degree) or one of the two (total-degree). The in-going and out-going relation types are also identified. The output dataframe is indexed on the IDs of the graph entities. :param return_relation_list: If True, return the list of unique relations going in/out of an entity. WARNING: expensive for large graphs. :return: The results dataframe, indexed on the IDs `e` of the graph entities, with columns: - **h_degree** (int): Number of triples with head entity `e`. - **t_degree** (int): Number of triples with tail entity `e`. - **tot_degree** (int): Number of triples with head entity `e` or tail entity `e`. - **h_unique_rel** (int): Number of distinct relation types among edges with head entity `e`. - **h_rel_list** (Optional[list]): List of unique relation types among edges with head entity `e`. Only returned if `return_relation_list = True`. - **t_unique_rel** (int): Number of distinct relation types among edges with tail entity `e`. - **t_rel_list** (Optional[list]): List of unique relation types among edges with tail entity `e`. Only returned if `return_relation_list = True`. - **n_loops** (int): number of loops around entity `e`. """ nodes_df = pd.merge( self.node_head_degree(return_relation_list), self.node_tail_degree(return_relation_list), left_index=True, right_index=True, ) nodes_df = pd.merge( nodes_df, self.loop_count(), left_index=True, right_index=True, ) nodes_df["tot_degree"] = ( nodes_df["h_degree"] + nodes_df["t_degree"] - nodes_df["n_loops"] ) return nodes_df[ ["h_degree", "t_degree", "tot_degree", "h_unique_rel"] + (["h_rel_list"] if return_relation_list else []) + ["t_unique_rel"] + (["t_rel_list"] if return_relation_list else []) + ["n_loops"] ]
[docs] @cache def edge_head_degree(self) -> pd.DataFrame: """ For each edge in the KG, compute the number of edges (in total or of the same relation type) with the same head node. :return: The result DataFrame, with the same indexing and ordering of triples as the original KG DataFrame, with columns (in addition to `h`, `r`, `t`): - **h_unique_rel** (int): Number of distinct relation types among edges with head entity `h`. - **h_degree** (int): Number of triples with head entity `h`. - **h_degree_same_rel** (int): Number of triples with head entity `h` and relation type `r`. """ edge_by_hr_count = self.df.groupby(["h", "r"], as_index=False).agg( h_degree_same_rel=("t", "count") ) df_res = self.df.merge( self.node_head_degree(), left_on=["h"], right_index=True, how="left" ) return df_res.merge(edge_by_hr_count, on=["h", "r"], how="left")
[docs] @cache def edge_tail_degree(self) -> pd.DataFrame: """ For each edge in the KG, compute the number of edges (in total or of the same relation type) with the same tail node. :return: The result DataFrame, with the same indexing and ordering of triples as the original KG DataFrame, with columns (in addition to `h`, `r`, `t`): - **t_unique_rel** (int): Number of distinct relation types among edges with tail entity `t`. - **t_degree** (int): Number of triples with tail entity `t`. - **t_degree_same_rel** (int): Number of triples with tail entity `t` and relation type `r`. """ edge_by_rt_count = self.df.groupby(["r", "t"], as_index=False).agg( t_degree_same_rel=("h", "count") ) df_res = self.df.merge( self.node_tail_degree(), left_on=["t"], right_index=True, how="left" ) return df_res.merge(edge_by_rt_count, on=["r", "t"], how="left")
[docs] def edge_cardinality(self) -> pd.DataFrame: """ Classify the cardinality of each edge in the KG: one-to-one (out-degree=in-degree=1), one-to-many (out-degree>1, in-degree=1), many-to-one(out-degree=1, in-degree>1) or many-to-many (in-degree>1, out-degree>1). :return: The result DataFrame, with the same indexing and ordering of triples as the original KG DataFrame, with columns (in addition to `h`, `r`, `t`): - **triple_cardinality** (int): cardinality type of the edge. - **triple_cardinality_same_rel** (int): cardinality type of the edge in the subgraph of edges with relation type `r`. """ head_degree = self.edge_head_degree() tail_degree = self.edge_tail_degree() df_res = pd.DataFrame( {"h": head_degree.h, "r": head_degree.r, "t": head_degree.t} ) # check if the values in the pair (h_degree, t_degree) are =1 or >1 # to determine the edge cardinality for suffix in ["", "_same_rel"]: # check if the values in the pair (h_degree, t_degree) are =1 or >1 # to determine the edge cardinality edge_type = 2 * (head_degree["h_degree" + suffix] == 1) + ( tail_degree["t_degree" + suffix] == 1 ) df_res["triple_cardinality" + suffix] = pd.cut( edge_type, bins=[0, 1, 2, 3, 4], right=False, labels=["M:M", "1:M", "M:1", "1:1"], ).astype(str) return df_res
[docs] def edge_degree_cardinality_summary( self, aggregate_by_r: bool = False ) -> pd.DataFrame: """ For each edge in the KG, compute the number of edges with the same head (head-degree, or out-degree), the same tail (tail-degree, or in-degree) or one of the two (total-degree). Based on entity degrees, each triple is classified as either one-to-one (out-degree=in-degree=1), one-to-many (out-degree>1, in-degree=1), many-to-one(out-degree=1, in-degree>1) or many-to-many (in-degree>1, out-degree>1). The output dataframe maintains the same indexing and ordering of triples as the original Knowledge Graph dataframe. :param aggregate_by_r: If True, return metrics aggregated by relation type (the output DataFrame will be indexed over relation IDs). :return: The results dataframe. Contains the following columns (in addition to `h`, `r`, `t`): - **h_unique_rel** (int): Number of distinct relation types among edges with head entity h. - **h_degree** (int): Number of triples with head entity h. - **h_degree_same_rel** (int): Number of triples with head entity h and relation type r. - **t_unique_rel** (int): Number of distinct relation types among edges with tail entity t. - **t_degree** (int): Number of triples with tail entity t. - **t_degree_same_rel** (int): Number of triples with tail entity t and relation type r. - **tot_degree** (int): Number of triples with head entity h or tail entity t. - **tot_degree_same_rel** (int): Number of triples with head entity h or tail entity t, and relation type r. - **triple_cardinality** (int): cardinality type of the edge. - **triple_cardinality_same_rel** (int): cardinality type of the edge in the subgraph of edges with relation type r. """ df_res = pd.concat( [ self.edge_head_degree(), self.edge_tail_degree().drop(columns=["h", "r", "t"]), ], axis=1, ) # compute number of parallel edges to avoid double-counting them # in total degree num_parallel = df_res.merge( self.df.groupby(["h", "t"], as_index=False).agg(n_parallel=("r", "count")), on=["h", "t"], how="left", ) df_res["tot_degree"] = ( df_res.h_degree + df_res.t_degree - num_parallel.n_parallel ) # when restricting to the relation type, there is only one edge # (the edge itself) that is double-counted df_res["tot_degree_same_rel"] = ( df_res.h_degree_same_rel + df_res.t_degree_same_rel - 1 ) edge_cardinality = self.edge_cardinality() df_res["triple_cardinality"] = edge_cardinality["triple_cardinality"] df_res["triple_cardinality_same_rel"] = edge_cardinality[ "triple_cardinality_same_rel" ] return aggregate_by_relation(df_res) if aggregate_by_r else df_res
[docs] def edge_pattern_summary( self, return_metapath_list: bool = False, composition_chunk_size: int = 2**8, composition_workers: int = 32, aggregate_by_r: bool = False, ) -> pd.DataFrame: """ Analyse structural properties of each edge in the KG: symmetry, presence of inverse/inference(=parallel) edges and triangles supported on the edge. The output dataframe maintains the same indexing and ordering of triples as the original Knowledge Graph dataframe. :param return_metapath_list: If True, return the list of unique metapaths for all triangles supported over one edge. WARNING: very expensive for large graphs. :param composition_chunk_size: Size of column chunks of sparse adjacency matrix to compute the triangle count. :param composition_workers: Number of workers to compute the triangle count. :param aggregate_by_r: If True, return metrics aggregated by relation type (the output DataFrame will be indexed over relation IDs). :return: The results dataframe. Contains the following columns (in addition to `h`, `r`, `t`): - **is_loop** (bool): True if the triple is a loop (``h == t``). - **is_symmetric** (bool): True if the triple (t, r, h) is also contained in the graph (assuming t and h are different). - **has_inverse** (bool): True if the graph contains one or more triples (t, r', h) with ``r' != r``. - **n_inverse_relations** (int): The number of inverse relations r'. - **inverse_edge_types** (list): All relations r' (including r if the edge is symmetric) such that (t, r', h) is in the graph. - **has_inference** (bool): True if the graph contains one or more triples (h, r', t) with ``r' != r``. - **n_inference_relations** (int): The number of inference relations r'. - **inference_edge_types** (list): All relations r' (including r) such that (h, r', t) is in the graph. - **has_composition** (bool): True if the graph contains one or more triangles supported on the edge: (h, r1, x) + (x, r2, t). - **n_triangles** (int): The number of triangles. - **has_undirected_composition** (bool): True if the graph contains one or more undirected triangles supported on the edge. - **n_undirected_triangles** (int): The number of undirected triangles (considering all edges as bidirectional). - **metapath_list** (list): The list of unique metapaths "r1-r2" for the directed triangles. """ # symmetry-asymmetry # edges with h/t switched df_inv = self.df.reindex(columns=["t", "r", "h"]).rename( columns={"t": "h", "r": "r", "h": "t"} ) df_res = pd.DataFrame( {"h": self.df.h, "r": self.df.r, "t": self.df.t, "is_symmetric": False} ) df_res.loc[ self.df.reset_index().merge(df_inv)["index"], "is_symmetric", ] = True # loops are treated separately df_res["is_loop"] = df_res.h == df_res.t df_res.loc[df_res.h == df_res.t, "is_symmetric"] = False # inverse unique_inv_r_by_ht = df_inv.groupby(["h", "t"], as_index=False).agg( inverse_edge_types=("r", list), ) df_res = df_res.merge( unique_inv_r_by_ht, left_on=["h", "t"], right_on=["h", "t"], how="left" ) df_res["inverse_edge_types"] = df_res["inverse_edge_types"].apply( lambda agg: agg if isinstance(agg, list) else [] ) # if the edge (h,r,t) is symmetric or loop, we do not consider the relation # r as a proper inverse df_res["n_inverse_relations"] = ( df_res.inverse_edge_types.str.len() - df_res.is_symmetric - df_res.is_loop ) df_res["n_inverse_relations"] = ( df_res["n_inverse_relations"].fillna(0).astype(int) ) df_res["has_inverse"] = df_res["n_inverse_relations"] > 0 # inference edges_between_ht = unique_inv_r_by_ht.reindex( columns=["t", "h", "inverse_edge_types"] ).rename( columns={"t": "h", "h": "t", "inverse_edge_types": "inference_edge_types"} ) df_res = df_res.merge( edges_between_ht, left_on=["h", "t"], right_on=["h", "t"], how="left" ) # inference_edge_types always contains the edge itself, which we need to drop df_res["n_inference_relations"] = df_res.inference_edge_types.str.len() - 1 df_res["has_inference"] = df_res["n_inference_relations"] > 0 # composition & metapaths # discard loops as edges of a triangle df_wo_loops = self.df[self.df.h != self.df.t] if return_metapath_list: # 2-hop paths df_bridges = df_wo_loops.merge( df_wo_loops, left_on="t", right_on="h", how="inner" ) df_triangles = df_wo_loops.merge( df_bridges, left_on=["h", "t"], right_on=["h_x", "t_y"], how="inner" ) df_triangles["metapath"] = ( df_triangles["r_x"].astype(str) + "-" + df_triangles["r_y"].astype(str) ) grouped_triangles = df_triangles.groupby( ["h", "r", "t"], as_index=False ).agg( n_triangles=("metapath", "count"), metapath_list=("metapath", "unique") ) df_res = df_res.merge( grouped_triangles, left_on=["h", "r", "t"], right_on=["h", "r", "t"], how="left", ) df_res["metapath_list"] = df_res["metapath_list"].apply( lambda agg: agg.tolist() if isinstance(agg, np.ndarray) else [] ) df_res["n_triangles"] = df_res["n_triangles"].fillna(0).astype(int) else: counts = composition_count( df_wo_loops, chunk_size=composition_chunk_size, workers=composition_workers, directed=True, ) df_res = df_res.merge( counts, on=["h", "t"], how="left", ) df_res["n_triangles"] = df_res["n_triangles"].fillna(0).astype(int) df_res["has_composition"] = df_res["n_triangles"] > 0 counts = composition_count( df_wo_loops, chunk_size=composition_chunk_size, workers=composition_workers, directed=False, ) df_res = df_res.merge( counts.rename(columns={"n_triangles": "n_undirected_triangles"}), on=["h", "t"], how="left", ) df_res["n_undirected_triangles"] = ( df_res["n_undirected_triangles"].fillna(0).astype(int) ) df_res["has_undirected_composition"] = df_res["n_undirected_triangles"] > 0 df_res = df_res[ [ "h", "r", "t", "is_loop", "is_symmetric", "has_inverse", "n_inverse_relations", "inverse_edge_types", "has_inference", "n_inference_relations", "inference_edge_types", "has_composition", "has_undirected_composition", "n_triangles", "n_undirected_triangles", ] + (["metapath_list"] if return_metapath_list else []) ] return aggregate_by_relation(df_res) if aggregate_by_r else df_res
[docs] def jaccard_similarity_relation_sets(self) -> pd.DataFrame: """ Compute the similarity between relations defined as the Jaccard Similarity between sets of entities (heads and tails) for all pairs of relations in the graph. :return: The results dataframe. Contains the following columns: - **r1** (int): Index of the first relation. - **r2** (int): Index of the second relation. - **num_triples_both** (int): Number of triples with relation r1/r2. - **frac_triples_both** (float): Fraction of triples with relation r1/r2. - **num_entities_both** (int): Number of unique entities (h or t) for triples with relation r1/r2. - **num_h_r1** (int): Number of unique head entities for relation r1. - **num_h_r2** (int): Number of unique head entities for relation r2. - **num_t_r1** (int): Number of unique tail entities for relation r1. - **num_t_r2** (int): Number of unique tail entities for relation r2. - **jaccard_head_head** (float): Jaccard similarity between the head set of r1 and the head set of r2. - **jaccard_tail_tail** (float): Jaccard similarity between the tail set of r1 and the tail set of r2. - **jaccard_head_tail** (float): Jaccard similarity between the head set of r1 and the tail set of r2. - **jaccard_tail_head** (float): Jaccard similarity between the tail set of r1 and the head set of r2. - **jaccard_both** (float): Jaccard similarity between the full entity set of r1 and r2. """ ent_unique = self.df.groupby("r", as_index=False).agg( num_triples=("r", "count"), head=("h", "unique"), tail=("t", "unique") ) ent_unique["both"] = ent_unique.apply( lambda x: np.unique(np.concatenate([x["head"], x["tail"]])), axis=1 ) ent_unique["num_h"] = ent_unique["head"].str.len() ent_unique["num_t"] = ent_unique["tail"].str.len() r_num = ent_unique[["r", "num_h", "num_t", "num_triples"]] # combinations of relations df_res = pd.merge( r_num.rename(columns={"r": "r1"}), r_num.rename(columns={"r": "r2"}), suffixes=["_r1", "_r2"], how="cross", ) df_res = df_res[df_res.r1 < df_res.r2] df_res["num_triples_both"] = df_res["num_triples_r1"] + df_res["num_triples_r2"] df_res["frac_triples_both"] = df_res["num_triples_both"] / self.df.shape[0] df_res["num_entities_both"] = df_res.apply( lambda x: len( np.unique( np.concatenate( [ ent_unique.loc[x["r1"], "both"], ent_unique.loc[x["r2"], "both"], ] ) ) ), axis=1, ) df_res = df_res[ [ "r1", "r2", "num_triples_both", "frac_triples_both", "num_entities_both", "num_h_r1", "num_h_r2", "num_t_r1", "num_t_r2", ] ] for r1_ent in ["head", "tail"]: for r2_ent in ["head", "tail"]: df_res[f"jaccard_{r1_ent}_{r2_ent}"] = [ jaccard_similarity(a, b) for a, b in zip( ent_unique.loc[df_res.r1, r1_ent], ent_unique.loc[df_res.r2, r2_ent], ) ] df_res["jaccard_both"] = [ jaccard_similarity(a, b) for a, b in zip( ent_unique.loc[df_res.r1, "both"], ent_unique.loc[df_res.r2, "both"] ) ] return df_res
[docs] def relational_affinity_ingram(self, min_max_norm: bool = False) -> pd.DataFrame: """ Compute the similarity between relations based on the approach proposed in InGram: Inductive Knowledge Graph Embedding via Relation Graphs, https://arxiv.org/abs/2305.19987. Only the pairs of relations witn ``affinity > 0`` are shown in the returned dataframe. :param min_max_norm: min-max normalization of edge weights. Defaults to False. :return: The results dataframe. Contains the following columns: - **h_relation** (int): Index of the head relation. - **t_relation** (int): Index of the tail relation. - **edge_weight** (float): Weight for the affinity between the head and the tail relation. """ hr_freqs = self.df.groupby(["h", "r"], as_index=False).count() # normalize by global h frequency hr_freqs["t"] = hr_freqs["t"] / hr_freqs.groupby("h")["t"].transform("sum") rt_freqs = self.df.groupby(["t", "r"], as_index=False).count() # normalize by global t frequency rt_freqs["h"] = rt_freqs["h"] / rt_freqs.groupby("t")["h"].transform("sum") E_h = coo_array( (hr_freqs.t, (hr_freqs.h, hr_freqs.r)), shape=[self.n_entity, self.n_rel], ) E_t = coo_array( (rt_freqs.h, (rt_freqs.t, rt_freqs.r)), shape=[self.n_entity, self.n_rel], ) A = (E_h.T @ E_h).toarray() + (E_t.T @ E_t).toarray() A[np.diag_indices_from(A)] = 0 if min_max_norm: A = (A - np.min(A)) / (np.max(A) - np.min(A)) h_rels, t_rels = np.nonzero(A) return pd.DataFrame( { "h_relation": h_rels, "t_relation": t_rels, "edge_weight": A[h_rels, t_rels], } )