from typing import List
from transformers import pipeline
from pyvis.network import Network
from functools import lru_cache
import spacy
DEFAULT_LABEL_COLORS = {
"ORG": "#7aecec",
"PRODUCT": "#bfeeb7",
"GPE": "#feca74",
"LOC": "#ff9561",
"PERSON": "#aa9cfc",
"NORP": "#c887fb",
"FACILITY": "#9cc9cc",
"EVENT": "#ffeb80",
"LAW": "#ff8197",
"LANGUAGE": "#ff8197",
"WORK_OF_ART": "#f0d0ff",
"DATE": "#bfe1d9",
"TIME": "#bfe1d9",
"MONEY": "#e4e7d2",
"QUANTITY": "#e4e7d2",
"ORDINAL": "#e4e7d2",
"CARDINAL": "#e4e7d2",
"PERCENT": "#e4e7d2",
}
def generate_knowledge_graph(texts: List[str], filename: str):
nlp = spacy.load("en_core_web_sm")
doc = nlp("\n".join(texts).lower())
NERs = [ent.text for ent in doc.ents]
NER_types = [ent.label_ for ent in doc.ents]
triplets = []
for triplet in texts:
triplets.extend(generate_partial_graph(triplet))
heads = [t["head"].lower() for t in triplets]
tails = [t["tail"].lower() for t in triplets]
nodes = list(set(heads + tails))
net = Network(directed=True, width="700px", height="700px")
for n in nodes:
if n in NERs:
NER_type = NER_types[NERs.index(n)]
if NER_type in NER_types:
if NER_type in DEFAULT_LABEL_COLORS.keys():
color = DEFAULT_LABEL_COLORS[NER_type]
else:
color = "#666666"
net.add_node(n, title=NER_type, shape="circle", color=color)
else:
net.add_node(n, shape="circle")
else:
net.add_node(n, shape="circle")
unique_triplets = set()
def stringify_trip(x): return x["tail"] + x["head"] + x["type"].lower()
for triplet in triplets:
if stringify_trip(triplet) not in unique_triplets:
net.add_edge(triplet["head"].lower(), triplet["tail"].lower(),
title=triplet["type"], label=triplet["type"])
unique_triplets.add(stringify_trip(triplet))
net.repulsion(
node_distance=200,
central_gravity=0.2,
spring_length=200,
spring_strength=0.05,
damping=0.09
)
net.set_edge_smooth('dynamic')
net.show(filename)
return nodes
@lru_cache(maxsize=16)
def generate_partial_graph(text: str):
triplet_extractor = pipeline(
'text2text-generation',
model='Babelscape/rebel-large',
tokenizer='Babelscape/rebel-large'
)
triples = triplet_extractor(
text,
return_tensors=True,
return_text=False)
if len(triples) == 0:
return []
a = [triples[0]["generated_token_ids"]]
extracted_text = triplet_extractor.tokenizer.batch_decode(a)
extracted_triplets = extract_triplets(extracted_text[0])
return extracted_triplets
def extract_triplets(text):
"""
Function to parse the generated text and extract the triplets
"""
triplets = []
relation, subject, relation, object_ = '', '', '', ''
text = text.strip()
current = 'x'
for token in text.replace("", "").replace("", "").replace("", "").split():
if token == "":
current = 't'
if relation != '':
triplets.append(
{'head': subject.strip(), 'type': relation.strip(), 'tail': object_.strip()})
relation = ''
subject = ''
elif token == "":
current = 's'
if relation != '':
triplets.append(
{'head': subject.strip(), 'type': relation.strip(), 'tail': object_.strip()})
object_ = ''
elif token == "":
current = 'o'
relation = ''
else:
if current == 't':
subject += ' ' + token
elif current == 's':
object_ += ' ' + token
elif current == 'o':
relation += ' ' + token
if subject != '' and relation != '' and object_ != '':
triplets.append(
{'head': subject.strip(), 'type': relation.strip(), 'tail': object_.strip()})
return triplets
if __name__ == "__main__":
generate_knowledge_graph(
["The dog is happy", "The cat is sad"], "test.html")