|
""" |
|
NER λͺ¨λΈμ μ΄μ©νμ¬ μμ
νλ μ½λμ
λλ€. |
|
""" |
|
import re |
|
import torch |
|
import numpy as np |
|
from collections import Counter |
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
|
|
def ner_tokenizer(text, max_seq_length, checkpoint): |
|
""" |
|
NERμ μν΄ ν
μ€νΈλ₯Ό ν ν°νν©λλ€. |
|
Args: |
|
sent: μ²λ¦¬νκ³ μ νλ ν
μ€νΈλ₯Ό μ
λ ₯λ°μ΅λλ€. |
|
max_seq_length: BERTμ configμμ μ²λ¦¬ κ°λ₯ν μ΅λ λ¬Έμμ΄ κΈΈμ΄λ 512μ
λλ€. μ΅λ κΈΈμ΄λ₯Ό λμ΄μμ§ μλλ‘, ν
μ€νΈ κΈΈμ΄κ° 512λ₯Ό λμ΄κ° κ²½μ° μ¬λ¬ κ°μ λ¬Έμμ΄λ‘ λΆλ¦¬ν©λλ€. |
|
λ¬Έλ§₯ μ 보λ₯Ό κ³ λ €νλ―λ‘ κ°λ₯ν κΈ΄ κΈΈμ΄λ‘ chunkingνλ κ²μ΄ μ’μ μ±λ₯μ 보μ₯ν μ μμ΅λλ€. |
|
checkpoint: NER λͺ¨λΈμ λν μ 보λ₯Ό λΆλ¬λ€μ
λλ€. |
|
Return: |
|
ner_tokenizer_dict: μλ μΈ μμλ₯Ό ν¬ν¨ν λμ
λ리μ
λλ€. |
|
input_ids: κ° ν ν°μ λͺ¨λΈ λμ
λ리μμμ μμ΄λκ°μ
λλ€. |
|
attention_mask: κ° ν ν°μ μ΄ν μ
λ§μ€ν¬ νμ±ν μ¬λΆμ
λλ€. |
|
token_type_ids: κ°μ²΄λͺ
μΈμ λ ν ν°μ κ²½μ° κ·Έ νμ
μ μμ΄λ(μ«μ μ‘°ν©)λ₯Ό λ°νν©λλ€. |
|
""" |
|
|
|
tokenizer = checkpoint['tokenizer'] |
|
|
|
|
|
pad_token_id = tokenizer.pad_token_id |
|
cls_token_id = tokenizer.cls_token_id |
|
sep_token_id = tokenizer.sep_token_id |
|
|
|
|
|
pre_syllable = "_" |
|
|
|
|
|
input_ids = [pad_token_id] * (max_seq_length - 1) |
|
attention_mask = [0] * (max_seq_length - 1) |
|
token_type_ids = [0] * max_seq_length |
|
|
|
|
|
text = text[:max_seq_length-2] |
|
|
|
|
|
for i, syllable in enumerate(text): |
|
if syllable == '_': |
|
pre_syllable = syllable |
|
if pre_syllable != "_": |
|
syllable = '##' + syllable |
|
pre_syllable = syllable |
|
|
|
|
|
input_ids[i] = tokenizer.convert_tokens_to_ids(syllable) |
|
|
|
attention_mask[i] = 1 |
|
|
|
|
|
input_ids = [cls_token_id] + input_ids[:-1] + [sep_token_id] |
|
|
|
attention_mask = [1] + attention_mask[:-1] + [1] |
|
|
|
ner_tokenizer_dict = {"input_ids": input_ids, |
|
"attention_mask": attention_mask, |
|
"token_type_ids": token_type_ids} |
|
|
|
return ner_tokenizer_dict |
|
|
|
def get_ner_predictions(text, checkpoint): |
|
""" |
|
ν ν°νν λ¬Έμ₯(tokenized_sent)κ³Ό μμΈ‘ν νκ·Έ(pred_tags) κ°μ λ§λλ ν¨μμ
λλ€. |
|
Args: |
|
text: NER μμΈ‘μ νμλ‘ νλ ν
μ€νΈλ₯Ό μ
λ ₯ν©λλ€. |
|
checkpoint: μ μ₯ν λͺ¨λΈμ λΆλ¬λ€μ
λλ€. |
|
Returns: |
|
tokenized_sent: λͺ¨λΈ μ
λ ₯μ μν ν ν°νλ λ¬Έμ₯ μ 보μ
λλ€. |
|
pred_tags: κ° ν ν°μ λν μμΈ‘λ νκ·Έλ€μ ν¬ν¨ν©λλ€. |
|
""" |
|
|
|
model = checkpoint['model'] |
|
|
|
tag2id = checkpoint['tag2id'] |
|
model.to(device) |
|
|
|
text = text.replace(' ', '_') |
|
|
|
|
|
predictions, true_labels = [], [] |
|
|
|
|
|
tokenized_sent = ner_tokenizer(text, len(text) + 2, checkpoint) |
|
|
|
|
|
input_ids = torch.tensor( |
|
tokenized_sent['input_ids']).unsqueeze(0).to(device) |
|
attention_mask = torch.tensor( |
|
tokenized_sent['attention_mask']).unsqueeze(0).to(device) |
|
token_type_ids = torch.tensor( |
|
tokenized_sent['token_type_ids']).unsqueeze(0).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids) |
|
|
|
|
|
logits = outputs['logits'] |
|
logits = logits.detach().cpu().numpy() |
|
label_ids = token_type_ids.cpu().numpy() |
|
|
|
|
|
predictions.extend([list(p) for p in np.argmax(logits, axis=2)]) |
|
|
|
true_labels.append(label_ids) |
|
|
|
|
|
pred_tags = [list(tag2id.keys())[p_i] for p in predictions for p_i in p] |
|
|
|
return tokenized_sent, pred_tags |
|
|
|
|
|
def ner_inference(tokenized_sent, pred_tags, checkpoint, name_len=5) -> list: |
|
""" |
|
NERμ μ€ννκ³ , μ΄λ¦κ³Ό μκ° λ° κ³΅κ° μ 보λ₯Ό μΆμΆν©λλ€. |
|
Args: |
|
tokenized_sent: ν ν°νλ λ¬Έμ₯μ΄ μ μ₯λ 리μ€νΈ |
|
pred_tags: κ° ν ν°μ λν μμΈ‘ νκ·Έκ° (NER κ²°κ³Ό) |
|
checkpoint: μ μ₯ν΄λ λͺ¨λΈμ λΆλ¬μ΄ |
|
name_len: λ μ νν μ΄λ¦ μΈμμ μν΄ μλ€λ‘ λͺ κ°μ μμ μ λ κ²ν ν μ§ μ§μ ν©λλ€. |
|
Returns: |
|
namelist: μΆμΆν μ΄λ¦(λ³μΉ ν¬ν¨) 리μ€νΈμ
λλ€. νμ²λ¦¬λ₯Ό ν΅ν΄ |
|
scene: μΆμΆν μ₯μ μκ° μ¬μ μ
λλ€. |
|
""" |
|
name_list = [] |
|
speaker = '' |
|
tokenizer = checkpoint['tokenizer'] |
|
scene = {'μ₯μ': [], 'μκ°': []} |
|
target = '' |
|
c_tag = None |
|
|
|
for i, tag in enumerate(pred_tags): |
|
token = tokenizer.convert_ids_to_tokens(tokenized_sent['input_ids'][i]).replace('#', '') |
|
if 'PER' in tag: |
|
if 'B' in tag and speaker != '': |
|
name_list.append(speaker) |
|
speaker = '' |
|
speaker += token |
|
|
|
elif speaker != '' and tag != pred_tags[i-1]: |
|
if speaker in name_list: |
|
name_list.append(speaker) |
|
else: |
|
tmp = speaker |
|
found_name = False |
|
|
|
for j in range(name_len): |
|
if i + j < len(tokenized_sent['input_ids']): |
|
token = tokenizer.convert_ids_to_tokens( |
|
tokenized_sent['input_ids'][i+j]).replace('#', '') |
|
tmp += token |
|
|
|
if tmp in name_list: |
|
name_list.append(tmp) |
|
found_name = True |
|
|
|
break |
|
|
|
if not found_name: |
|
name_list.append(speaker) |
|
|
|
speaker = '' |
|
|
|
elif tag != 'O': |
|
if tag.startswith('B'): |
|
if c_tag in ['TIM', 'DAT']: |
|
scene['μκ°'].append(target) |
|
elif c_tag =='LOC': |
|
scene['μ₯μ'].append(target) |
|
c_tag = tag[2:] |
|
target = token |
|
else: |
|
target += token.replace('_', ' ') |
|
|
|
return name_list, scene |
|
|
|
|
|
def make_name_list(ner_inputs, checkpoint): |
|
""" |
|
λ¬Έμ₯λ€μ NER λλ €μ Name List λ§λ€κΈ°. |
|
""" |
|
name_list = [] |
|
times = [] |
|
places = [] |
|
|
|
for ner_input in ner_inputs: |
|
tokenized_sent, pred_tags = get_ner_predictions(ner_input, checkpoint) |
|
names, scene = ner_inference(tokenized_sent, pred_tags, checkpoint) |
|
name_list.extend(names) |
|
times.extend(scene['μκ°']) |
|
places.extend(scene['μ₯μ']) |
|
|
|
return name_list, times, places |
|
|
|
|
|
def show_name_list(name_list): |
|
""" |
|
μ¬μ©μ μΉνμ μΌλ‘ λ€μ리μ€νΈλ₯Ό 보μ¬μ€λλ€. |
|
Arg: |
|
name_list: μΆμΆν μ΄λ¦ 리μ€νΈ |
|
Return: |
|
name: λμΌν μ΄λ¦μ΄ λͺ λ² λ±μ₯νλμ§ νμλ₯Ό ν¨κ» μ 곡ν©λλ€. |
|
""" |
|
name = dict(Counter(name_list)) |
|
|
|
return name |
|
|
|
|
|
def compare_strings(str1, str2): |
|
""" |
|
nerλ‘ μΆμΆν μΈλͺ
μ νμ²λ¦¬νλ μ½λμ
λλ€. |
|
λΉκ΅ν λ λ¬Έμμ΄μ κΈΈμ΄κ° λ€λ₯Ό κ²½μ°, λ 짧μ λ¬Έμμ΄μ΄ λ κΈ΄ λ¬Έμμ΄μ ν¬ν¨λλμ§ νμΈν©λλ€. |
|
λΉκ΅ν λ λ¬Έμμ΄μ κΈΈμ΄κ° κ°μ κ²½μ°, κ²ΉμΉλ λΆλΆμ΄ 2κΈμ μ΄μμΌ κ²½μ° κ°μ μ΄λ¦μΌλ‘ μ§μ ν©λλ€. |
|
μ΄ ν¨μμ μλμ combine_similar_namesλ₯Ό ν¨κ» μ€ννλ©΄, 'νλ€μ 'κ³Ό 'λ€μ μ΄', 'λ€μ μ΄κ°' λ±μ λͺ¨λ νλμ μΈλ¬Όλ‘ λ¬Άμ μ μμ΅λλ€. |
|
|
|
Args: λΉκ΅νλ €λ λ λ¬Έμμ΄ |
|
Return: λ λ¬Έμμ΄μ΄ κ°μ μ΄λ¦μΌλ‘ νλ¨λ κ²½μ° True, μλ κ²½μ° False |
|
""" |
|
if len(str1) != len(str2): |
|
|
|
shorter, longer = (str1, str2) if len(str1) < len(str2) else (str2, str1) |
|
if shorter in longer: |
|
return True |
|
else: |
|
same_part = [] |
|
for i in range(len(str1)): |
|
if str1[i] in str2: |
|
same_part += str1[i] |
|
continue |
|
else: |
|
break |
|
if len(same_part) >= 2: |
|
return True |
|
|
|
return False |
|
|
|
def combine_similar_names(names_dict): |
|
""" |
|
compare_strings ν¨μλ₯Ό λ°νμΌλ‘ μ μ¬ν μ΄λ¦μ ν¨κ» λ¬Άμ΅λλ€. |
|
2κΈμλ μ΄λ¦μΌ νλ₯ μ΄ λμΌλ κΈ°μ€μ μΌλ‘ μ§μ ν©λλ€. |
|
""" |
|
names = names_dict.keys() |
|
similar_groups = [[name] for name in names if len(name) == 2] |
|
idx = 0 |
|
|
|
|
|
for name in names: |
|
found = False |
|
for group in similar_groups: |
|
idx += 1 |
|
for item in group: |
|
if compare_strings(name, item) and len(name)>1: |
|
found = True |
|
cleaned_text = re.sub(r'(μ|μ΄)$', '', item) |
|
if len(name) == len(item): |
|
same_part = '' |
|
|
|
for i in range(len(name)): |
|
if name[i] in item: |
|
same_part += name[i] |
|
if same_part not in group and cleaned_text not in group: |
|
group.append(cleaned_text) |
|
|
|
else: |
|
group.append(name) |
|
|
|
break |
|
if found: |
|
break |
|
if not found: |
|
similar_groups.append([name]) |
|
|
|
updated_names = {tuple(name for name in group if len(name) > 1): counts for group, counts in ( |
|
(group, sum(names_dict[name] for name in group if name != '')) for group in similar_groups) |
|
if len([name for name in group if len(name) > 1]) > 0} |
|
|
|
return updated_names |
|
|
|
def convert_name2codename(codename2name, text): |
|
"""REλ₯Ό μ΄μ©νμ¬ μ΄λ¦μ μ½λλ€μμΌλ‘ λ³κ²½ν©λλ€. μ΄λ κ° μ½λλ€μμ λ²νΈλ λΉλμ κΈ°μ€ λ΄λ¦Όμ°¨μν κ²°κ³Όμ
λλ€.""" |
|
import re |
|
for n_list in codename2name.values(): |
|
n_list.sort(key=lambda x:(len(x), x), reverse=True) |
|
|
|
for codename, n_list in codename2name.items(): |
|
for subname in n_list: |
|
text = re.sub(subname, codename, text) |
|
|
|
return text |
|
|
|
|
|
def convert_codename2name(codename2name, text): |
|
"""μ½λλ€μμ μ΄λ¦μΌλ‘ λ³κ²½ν΄μ€λλ€.""" |
|
outputs = [] |
|
for i in text: |
|
try: |
|
outputs.append(codename2name[i][0]) |
|
except: |
|
outputs.append('μ μ μμ') |
|
|
|
return outputs |
|
|