koCSN_SAPR / utils /ner_utils.py
yuneun92's picture
Upload 13 files
bcb1848 verified
raw
history blame
No virus
12.9 kB
"""
NER λͺ¨λΈμ„ μ΄μš©ν•˜μ—¬ μž‘μ—…ν•˜λŠ” μ½”λ“œμž…λ‹ˆλ‹€.
"""
import re
import torch
import numpy as np
from collections import Counter
device = "cuda:0" if torch.cuda.is_available() else "cpu"
def ner_tokenizer(text, max_seq_length, checkpoint):
"""
NER을 μœ„ν•΄ ν…μŠ€νŠΈλ₯Ό ν† ν°ν™”ν•©λ‹ˆλ‹€.
Args:
sent: μ²˜λ¦¬ν•˜κ³ μž ν•˜λŠ” ν…μŠ€νŠΈλ₯Ό μž…λ ₯λ°›μŠ΅λ‹ˆλ‹€.
max_seq_length: BERT의 configμ—μ„œ 처리 κ°€λŠ₯ν•œ μ΅œλŒ€ λ¬Έμžμ—΄ κΈΈμ΄λŠ” 512μž…λ‹ˆλ‹€. μ΅œλŒ€ 길이λ₯Ό λ„˜μ–΄μ„œμ§€ μ•Šλ„λ‘, ν…μŠ€νŠΈ 길이가 512λ₯Ό λ„˜μ–΄κ°ˆ 경우 μ—¬λŸ¬ 개의 λ¬Έμžμ—΄λ‘œ λΆ„λ¦¬ν•©λ‹ˆλ‹€.
λ¬Έλ§₯ 정보λ₯Ό κ³ λ €ν•˜λ―€λ‘œ κ°€λŠ₯ν•œ κΈ΄ 길이둜 chunkingν•˜λŠ” 것이 쒋은 μ„±λŠ₯을 보μž₯ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
checkpoint: NER λͺ¨λΈμ— λŒ€ν•œ 정보λ₯Ό λΆˆλŸ¬λ“€μž…λ‹ˆλ‹€.
Return:
ner_tokenizer_dict: μ•„λž˜ μ„Έ μš”μ†Œλ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬μž…λ‹ˆλ‹€.
input_ids: 각 ν† ν°μ˜ λͺ¨λΈ λ”•μ…”λ„ˆλ¦¬μ—μ„œμ˜ μ•„μ΄λ””κ°’μž…λ‹ˆλ‹€.
attention_mask: 각 ν† ν°μ˜ μ–΄νƒ μ…˜ 마슀크 ν™œμ„±ν™” μ—¬λΆ€μž…λ‹ˆλ‹€.
token_type_ids: 개체λͺ… 인식 된 ν† ν°μ˜ 경우 κ·Έ νƒ€μž…μ˜ 아이디(숫자 μ‘°ν•©)λ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€.
"""
#μ €μž₯된 λͺ¨λΈμ˜ ν† ν¬λ‚˜μ΄μ €λ₯Ό λΆˆλŸ¬μ˜΅λ‹ˆλ‹€.
tokenizer = checkpoint['tokenizer']
#각각 νŒ¨λ”©, λ¬Έμž₯ μ‹œμž‘, λ¬Έμž₯ 끝을 λ‚˜νƒ€λ‚΄λŠ” νŠΉλ³„ν•œ ν† ν°λ“€μ˜ ID 값듀을 κ°€μ Έμ˜΅λ‹ˆλ‹€.
pad_token_id = tokenizer.pad_token_id
cls_token_id = tokenizer.cls_token_id
sep_token_id = tokenizer.sep_token_id
#이전 μŒμ ˆμ„ μ €μž₯ν•˜λŠ” λ³€μˆ˜λ₯Ό μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
pre_syllable = "_"
#ν† ν¬λ‚˜μ΄μ§•λœ κ²°κ³Όλ₯Ό μ €μž₯ν•  λ¦¬μŠ€νŠΈλ“€μ„ μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
input_ids = [pad_token_id] * (max_seq_length - 1)
attention_mask = [0] * (max_seq_length - 1)
token_type_ids = [0] * max_seq_length
#μž…λ ₯된 ν…μŠ€νŠΈλ₯Ό μ΅œλŒ€ μ‹œν€€μŠ€ 길이에 맞게 μž˜λΌλƒ…λ‹ˆλ‹€.
text = text[:max_seq_length-2]
#ν…μŠ€νŠΈμ˜ 각 μŒμ ˆμ— λŒ€ν•΄ λ°˜λ³΅λ¬Έμ„ μ‹€ν–‰ν•©λ‹ˆλ‹€.
for i, syllable in enumerate(text):
if syllable == '_':
pre_syllable = syllable
if pre_syllable != "_":
syllable = '##' + syllable
pre_syllable = syllable
#토큰을 λͺ¨λΈμ˜ 단어 사전에 μžˆλŠ” ID κ°’μœΌλ‘œ λ³€ν™˜ν•˜μ—¬ input_ids λ¦¬μŠ€νŠΈμ— μ €μž₯ν•©λ‹ˆλ‹€.
input_ids[i] = tokenizer.convert_tokens_to_ids(syllable)
#ν•΄λ‹Ή μœ„μΉ˜μ˜ 토큰에 λŒ€ν•œ μ–΄ν…μ…˜ 마슀크λ₯Ό ν™œμ„±ν™”ν•©λ‹ˆλ‹€.
attention_mask[i] = 1
#μž…λ ₯ μ‹œν€€μŠ€μ˜ μ‹œμž‘μ—λŠ” cls_token_idλ₯Ό, λμ—λŠ” sep_token_idλ₯Ό μΆ”κ°€ν•©λ‹ˆλ‹€.
input_ids = [cls_token_id] + input_ids[:-1] + [sep_token_id]
#μ–΄ν…μ…˜ λ§ˆμŠ€ν¬λ„ μ‹œμž‘κ³Ό 끝 토큰을 κ³ λ €ν•˜μ—¬ μˆ˜μ •ν•©λ‹ˆλ‹€.
attention_mask = [1] + attention_mask[:-1] + [1]
ner_tokenizer_dict = {"input_ids": input_ids,
"attention_mask": attention_mask,
"token_type_ids": token_type_ids}
return ner_tokenizer_dict
def get_ner_predictions(text, checkpoint):
"""
ν† ν°ν™”ν•œ λ¬Έμž₯(tokenized_sent)κ³Ό μ˜ˆμΈ‘ν•œ νƒœκ·Έ(pred_tags) 값을 λ§Œλ“œλŠ” ν•¨μˆ˜μž…λ‹ˆλ‹€.
Args:
text: NER μ˜ˆμΈ‘μ„ ν•„μš”λ‘œ ν•˜λŠ” ν…μŠ€νŠΈλ₯Ό μž…λ ₯ν•©λ‹ˆλ‹€.
checkpoint: μ €μž₯ν•œ λͺ¨λΈμ„ λΆˆλŸ¬λ“€μž…λ‹ˆλ‹€.
Returns:
tokenized_sent: λͺ¨λΈ μž…λ ₯을 μœ„ν•œ ν† ν°ν™”λœ λ¬Έμž₯ μ •λ³΄μž…λ‹ˆλ‹€.
pred_tags: 각 토큰에 λŒ€ν•œ 예츑된 νƒœκ·Έλ“€μ„ ν¬ν•¨ν•©λ‹ˆλ‹€.
"""
#μ €μž₯ν•œ λͺ¨λΈμ„ λΆˆλŸ¬λ“€μž…λ‹ˆλ‹€.
model = checkpoint['model']
#νƒœκ·Έμ™€ ν•΄λ‹Ή νƒœκ·Έμ˜ ID 맀핑 정보λ₯Ό κ°€μ Έμ˜΅λ‹ˆλ‹€.
tag2id = checkpoint['tag2id']
model.to(device)
#μž…λ ₯된 ν…μŠ€νŠΈμ—μ„œ 곡백을 μ–Έλ”μŠ€μ½”μ–΄(_)둜 λŒ€μ²΄ν•©λ‹ˆλ‹€.
text = text.replace(' ', '_')
#μ˜ˆμΈ‘κ°’κ³Ό μ‹€μ œ 라벨을 μ €μž₯ν•  빈 리슀트λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€.
predictions, true_labels = [], []
#ner_tokenizer ν•¨μˆ˜λ₯Ό μ‚¬μš©ν•˜μ—¬ ν…μŠ€νŠΈλ₯Ό ν† ν°ν™”ν•©λ‹ˆλ‹€.
tokenized_sent = ner_tokenizer(text, len(text) + 2, checkpoint)
#ν† ν°ν™”λœ κ²°κ³Όλ₯Ό ν† λŒ€λ‘œ ν…μ„œλ‘œ λ³€ν™˜ν•˜μ—¬ λͺ¨λΈ μž…λ ₯ ν˜•μ‹μ— 맞게 μ€€λΉ„ν•©λ‹ˆλ‹€.
input_ids = torch.tensor(
tokenized_sent['input_ids']).unsqueeze(0).to(device)
attention_mask = torch.tensor(
tokenized_sent['attention_mask']).unsqueeze(0).to(device)
token_type_ids = torch.tensor(
tokenized_sent['token_type_ids']).unsqueeze(0).to(device)
#κ·Έλž˜λ””μ–ΈνŠΈ 계산을 μˆ˜ν–‰ν•˜μ§€ μ•ŠκΈ° μœ„ν•΄ torch.no_grad() μ»¨ν…μŠ€νŠΈ λ‚΄μ—μ„œ λ‹€μŒμ„ μ‹€ν–‰ν•©λ‹ˆλ‹€. (eval μ˜μ—­μ΄κΈ° λ•Œλ¬Έμ— ν•™μŠ΅μ„ ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€)
with torch.no_grad():
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids)
#λͺ¨λΈ 좜λ ₯μ—μ„œ λ‘œμ§“ 값을 가져와 Numpyκ°’μœΌλ‘œ λ³€ν™˜ν•˜κ³ , 라벨 ID듀을 CPU μƒμ˜ NumPy λ°°μ—΄λ‘œ κ°€μ Έμ˜΅λ‹ˆλ‹€.
logits = outputs['logits']
logits = logits.detach().cpu().numpy()
label_ids = token_type_ids.cpu().numpy()
#예츑된 라벨 값을 κ°€μ Έμ™€μ„œ λ¦¬μŠ€νŠΈμ— μΆ”κ°€ν•©λ‹ˆλ‹€.
predictions.extend([list(p) for p in np.argmax(logits, axis=2)])
#μ‹€μ œ 라벨을 λ¦¬μŠ€νŠΈμ— μΆ”κ°€ν•©λ‹ˆλ‹€.
true_labels.append(label_ids)
#예츑된 라벨 IDλ₯Ό μ‹€μ œ νƒœκ·Έλ‘œ λ³€ν™˜ν•©λ‹ˆλ‹€.
pred_tags = [list(tag2id.keys())[p_i] for p in predictions for p_i in p]
return tokenized_sent, pred_tags
def ner_inference(tokenized_sent, pred_tags, checkpoint, name_len=5) -> list:
"""
NER을 μ‹€ν–‰ν•˜κ³ , 이름과 μ‹œκ°„ 및 곡간 정보λ₯Ό μΆ”μΆœν•©λ‹ˆλ‹€.
Args:
tokenized_sent: ν† ν°ν™”λœ λ¬Έμž₯이 μ €μž₯된 리슀트
pred_tags: 각 토큰에 λŒ€ν•œ 예츑 νƒœκ·Έκ°’ (NER κ²°κ³Ό)
checkpoint: μ €μž₯ν•΄λ‘” λͺ¨λΈμ„ 뢈러옴
name_len: 더 μ •ν™•ν•œ 이름 인식을 μœ„ν•΄ μ•žλ’€λ‘œ λͺ‡ 개의 μŒμ ˆμ„ 더 검토할지 μ§€μ •ν•©λ‹ˆλ‹€.
Returns:
namelist: μΆ”μΆœν•œ 이름(별칭 포함) λ¦¬μŠ€νŠΈμž…λ‹ˆλ‹€. ν›„μ²˜λ¦¬λ₯Ό 톡해
scene: μΆ”μΆœν•œ μž₯μ†Œ μ‹œκ°„ μ‚¬μ „μž…λ‹ˆλ‹€.
"""
name_list = []
speaker = ''
tokenizer = checkpoint['tokenizer']
scene = {'μž₯μ†Œ': [], 'μ‹œκ°„': []}
target = ''
c_tag = None
for i, tag in enumerate(pred_tags):
token = tokenizer.convert_ids_to_tokens(tokenized_sent['input_ids'][i]).replace('#', '')
if 'PER' in tag:
if 'B' in tag and speaker != '':
name_list.append(speaker)
speaker = ''
speaker += token
elif speaker != '' and tag != pred_tags[i-1]:
if speaker in name_list:
name_list.append(speaker)
else:
tmp = speaker
found_name = False
# print(f'{speaker}에 의문이 생겨 ν™•μΈν•΄λ΄…λ‹ˆλ‹€.')
for j in range(name_len):
if i + j < len(tokenized_sent['input_ids']):
token = tokenizer.convert_ids_to_tokens(
tokenized_sent['input_ids'][i+j]).replace('#', '')
tmp += token
# print(f'{speaker} λ’€λ‘œ λ‚˜μ˜¨ {j} 번째 κΉŒμ§€ ν™•μΈν•œκ²°κ³Ό, {tmp} μž…λ‹ˆλ‹€')
if tmp in name_list:
name_list.append(tmp)
found_name = True
# print(f'λͺ…단에 {tmp} κ°€ μ‘΄μž¬ν•˜μ—¬, {speaker} λŒ€μ‹  μΆ”κ°€ν•˜μ˜€μŠ΅λ‹ˆλ‹€.')
break
if not found_name:
name_list.append(speaker)
# print(f'찾지 λͺ»ν•˜μ—¬ {speaker} λ₯Ό μΆ”κ°€ν•˜μ˜€μŠ΅λ‹ˆλ‹€.')
speaker = ''
elif tag != 'O':
if tag.startswith('B'):
if c_tag in ['TIM', 'DAT']:
scene['μ‹œκ°„'].append(target)
elif c_tag =='LOC':
scene['μž₯μ†Œ'].append(target)
c_tag = tag[2:]
target = token
else:
target += token.replace('_', ' ')
return name_list, scene
def make_name_list(ner_inputs, checkpoint):
"""
λ¬Έμž₯듀을 NER λŒλ €μ„œ Name List λ§Œλ“€κΈ°.
"""
name_list = []
times = []
places = []
for ner_input in ner_inputs:
tokenized_sent, pred_tags = get_ner_predictions(ner_input, checkpoint)
names, scene = ner_inference(tokenized_sent, pred_tags, checkpoint)
name_list.extend(names)
times.extend(scene['μ‹œκ°„'])
places.extend(scene['μž₯μ†Œ'])
return name_list, times, places
def show_name_list(name_list):
"""
μ‚¬μš©μž μΉœν™”μ μœΌλ‘œ λ„€μž„λ¦¬μŠ€νŠΈλ₯Ό λ³΄μ—¬μ€λ‹ˆλ‹€.
Arg:
name_list: μΆ”μΆœν•œ 이름 리슀트
Return:
name: λ™μΌν•œ 이름이 λͺ‡ 번 λ“±μž₯ν–ˆλŠ”μ§€ 횟수λ₯Ό ν•¨κ»˜ μ œκ³΅ν•©λ‹ˆλ‹€.
"""
name = dict(Counter(name_list))
return name
def compare_strings(str1, str2):
"""
ner둜 μΆ”μΆœν•œ 인λͺ…을 ν›„μ²˜λ¦¬ν•˜λŠ” μ½”λ“œμž…λ‹ˆλ‹€.
비ꡐ할 두 λ¬Έμžμ—΄μ˜ 길이가 λ‹€λ₯Ό 경우, 더 짧은 λ¬Έμžμ—΄μ΄ 더 κΈ΄ λ¬Έμžμ—΄μ— ν¬ν•¨λ˜λŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.
비ꡐ할 두 λ¬Έμžμ—΄μ˜ 길이가 같을 경우, κ²ΉμΉ˜λŠ” 뢀뢄이 2κΈ€μž 이상일 경우 같은 μ΄λ¦„μœΌλ‘œ μ§€μ •ν•©λ‹ˆλ‹€.
이 ν•¨μˆ˜μ™€ μ•„λž˜μ˜ combine_similar_namesλ₯Ό ν•¨κ»˜ μ‹€ν–‰ν•˜λ©΄, 'ν•œλ‹€μ •'κ³Ό '닀정이', '닀정이가' 등은 λͺ¨λ‘ ν•˜λ‚˜μ˜ 인물둜 묢을 수 μžˆμŠ΅λ‹ˆλ‹€.
Args: λΉ„κ΅ν•˜λ €λŠ” 두 λ¬Έμžμ—΄
Return: 두 λ¬Έμžμ—΄μ΄ 같은 μ΄λ¦„μœΌλ‘œ νŒλ‹¨λ  경우 True, 아닐 경우 False
"""
if len(str1) != len(str2):
# 더 짧은 λ¬Έμžμ—΄μ΄ 더 κΈ΄ λ¬Έμžμ—΄μ— ν¬ν•¨λ˜λŠ”μ§€ 확인
shorter, longer = (str1, str2) if len(str1) < len(str2) else (str2, str1)
if shorter in longer:
return True
else:
same_part = []
for i in range(len(str1)):
if str1[i] in str2:
same_part += str1[i]
continue
else:
break
if len(same_part) >= 2:
return True
return False
def combine_similar_names(names_dict):
"""
compare_strings ν•¨μˆ˜λ₯Ό λ°”νƒ•μœΌλ‘œ μœ μ‚¬ν•œ 이름을 ν•¨κ»˜ λ¬ΆμŠ΅λ‹ˆλ‹€.
2κΈ€μžλŠ” 이름일 ν™•λ₯ μ΄ λ†’μœΌλ‹ˆ κΈ°μ€€μ μœΌλ‘œ μ§€μ •ν•©λ‹ˆλ‹€.
"""
names = names_dict.keys()
similar_groups = [[name] for name in names if len(name) == 2]
idx = 0
# print(similar_groups, '\n',idx)
for name in names:
found = False
for group in similar_groups:
idx += 1
for item in group:
if compare_strings(name, item) and len(name)>1:
found = True
cleaned_text = re.sub(r'(μ•„|이)$', '', item)
if len(name) == len(item):
same_part = ''
# μ™„μ „νžˆ μΌμΉ˜ν•˜λŠ” 뢀뢄이 μžˆλŠ”μ§€ 확인
for i in range(len(name)):
if name[i] in item:
same_part += name[i]
if same_part not in group and cleaned_text not in group:
group.append(cleaned_text)
# print(similar_groups, '\n',idx, 'λ¬Έμžμ—΄μ˜ 길이가 같을 λ•Œ')
else:
group.append(name)
# print(similar_groups, '\n',idx, 'λ¬Έμžμ—΄μ˜ 길이가 λ‹€λ₯Ό λ•Œ')
break
if found:
break
if not found:
similar_groups.append([name])
updated_names = {tuple(name for name in group if len(name) > 1): counts for group, counts in (
(group, sum(names_dict[name] for name in group if name != '')) for group in similar_groups)
if len([name for name in group if len(name) > 1]) > 0}
return updated_names
def convert_name2codename(codename2name, text):
"""REλ₯Ό μ΄μš©ν•˜μ—¬ 이름을 μ½”λ“œλ„€μž„μœΌλ‘œ λ³€κ²½ν•©λ‹ˆλ‹€. μ΄λ•Œ 각 μ½”λ“œλ„€μž„μ˜ λ²ˆν˜ΈλŠ” λΉˆλ„μˆ˜ κΈ°μ€€ λ‚΄λ¦Όμ°¨μˆœν•œ κ²°κ³Όμž…λ‹ˆλ‹€."""
import re
for n_list in codename2name.values():
n_list.sort(key=lambda x:(len(x), x), reverse=True)
for codename, n_list in codename2name.items():
for subname in n_list:
text = re.sub(subname, codename, text)
return text
def convert_codename2name(codename2name, text):
"""μ½”λ“œλ„€μž„μ„ μ΄λ¦„μœΌλ‘œ λ³€κ²½ν•΄μ€λ‹ˆλ‹€."""
outputs = []
for i in text:
try:
outputs.append(codename2name[i][0])
except:
outputs.append('μ•Œ 수 μ—†μŒ')
return outputs