File size: 12,920 Bytes
bcb1848
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""
NER λͺ¨λΈμ„ μ΄μš©ν•˜μ—¬ μž‘μ—…ν•˜λŠ” μ½”λ“œμž…λ‹ˆλ‹€.
"""
import re
import torch
import numpy as np
from collections import Counter

device = "cuda:0" if torch.cuda.is_available() else "cpu"

def ner_tokenizer(text, max_seq_length, checkpoint):
    """
    NER을 μœ„ν•΄ ν…μŠ€νŠΈλ₯Ό ν† ν°ν™”ν•©λ‹ˆλ‹€. 
    Args:
        sent: μ²˜λ¦¬ν•˜κ³ μž ν•˜λŠ” ν…μŠ€νŠΈλ₯Ό μž…λ ₯λ°›μŠ΅λ‹ˆλ‹€.
        max_seq_length: BERT의 configμ—μ„œ 처리 κ°€λŠ₯ν•œ μ΅œλŒ€ λ¬Έμžμ—΄ κΈΈμ΄λŠ” 512μž…λ‹ˆλ‹€. μ΅œλŒ€ 길이λ₯Ό λ„˜μ–΄μ„œμ§€ μ•Šλ„λ‘, ν…μŠ€νŠΈ 길이가 512λ₯Ό λ„˜μ–΄κ°ˆ 경우 μ—¬λŸ¬ 개의 λ¬Έμžμ—΄λ‘œ λΆ„λ¦¬ν•©λ‹ˆλ‹€. 
                        λ¬Έλ§₯ 정보λ₯Ό κ³ λ €ν•˜λ―€λ‘œ κ°€λŠ₯ν•œ κΈ΄ 길이둜 chunkingν•˜λŠ” 것이 쒋은 μ„±λŠ₯을 보μž₯ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
        checkpoint: NER λͺ¨λΈμ— λŒ€ν•œ 정보λ₯Ό λΆˆλŸ¬λ“€μž…λ‹ˆλ‹€.
    Return:
        ner_tokenizer_dict: μ•„λž˜ μ„Έ μš”μ†Œλ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬μž…λ‹ˆλ‹€.
            input_ids: 각 ν† ν°μ˜ λͺ¨λΈ λ”•μ…”λ„ˆλ¦¬μ—μ„œμ˜ μ•„μ΄λ””κ°’μž…λ‹ˆλ‹€.
            attention_mask: 각 ν† ν°μ˜ μ–΄νƒ μ…˜ 마슀크 ν™œμ„±ν™” μ—¬λΆ€μž…λ‹ˆλ‹€.
            token_type_ids: 개체λͺ… 인식 된 ν† ν°μ˜ 경우 κ·Έ νƒ€μž…μ˜ 아이디(숫자 μ‘°ν•©)λ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€.
    """
    #μ €μž₯된 λͺ¨λΈμ˜ ν† ν¬λ‚˜μ΄μ €λ₯Ό λΆˆλŸ¬μ˜΅λ‹ˆλ‹€.
    tokenizer = checkpoint['tokenizer']

    #각각 νŒ¨λ”©, λ¬Έμž₯ μ‹œμž‘, λ¬Έμž₯ 끝을 λ‚˜νƒ€λ‚΄λŠ” νŠΉλ³„ν•œ ν† ν°λ“€μ˜ ID 값듀을 κ°€μ Έμ˜΅λ‹ˆλ‹€.
    pad_token_id = tokenizer.pad_token_id
    cls_token_id = tokenizer.cls_token_id
    sep_token_id = tokenizer.sep_token_id

    #이전 μŒμ ˆμ„ μ €μž₯ν•˜λŠ” λ³€μˆ˜λ₯Ό μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
    pre_syllable = "_" 

    #ν† ν¬λ‚˜μ΄μ§•λœ κ²°κ³Όλ₯Ό μ €μž₯ν•  λ¦¬μŠ€νŠΈλ“€μ„ μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
    input_ids = [pad_token_id] * (max_seq_length - 1)
    attention_mask = [0] * (max_seq_length - 1)
    token_type_ids = [0] * max_seq_length

    #μž…λ ₯된 ν…μŠ€νŠΈλ₯Ό μ΅œλŒ€ μ‹œν€€μŠ€ 길이에 맞게 μž˜λΌλƒ…λ‹ˆλ‹€.
    text = text[:max_seq_length-2]

    #ν…μŠ€νŠΈμ˜ 각 μŒμ ˆμ— λŒ€ν•΄ λ°˜λ³΅λ¬Έμ„ μ‹€ν–‰ν•©λ‹ˆλ‹€.
    for i, syllable in enumerate(text):
        if syllable == '_':
            pre_syllable = syllable
        if pre_syllable != "_":
            syllable = '##' + syllable
        pre_syllable = syllable

        #토큰을 λͺ¨λΈμ˜ 단어 사전에 μžˆλŠ” ID κ°’μœΌλ‘œ λ³€ν™˜ν•˜μ—¬ input_ids λ¦¬μŠ€νŠΈμ— μ €μž₯ν•©λ‹ˆλ‹€.
        input_ids[i] = tokenizer.convert_tokens_to_ids(syllable)
        #ν•΄λ‹Ή μœ„μΉ˜μ˜ 토큰에 λŒ€ν•œ μ–΄ν…μ…˜ 마슀크λ₯Ό ν™œμ„±ν™”ν•©λ‹ˆλ‹€.
        attention_mask[i] = 1

    #μž…λ ₯ μ‹œν€€μŠ€μ˜ μ‹œμž‘μ—λŠ” cls_token_idλ₯Ό, λμ—λŠ” sep_token_idλ₯Ό μΆ”κ°€ν•©λ‹ˆλ‹€.
    input_ids = [cls_token_id] + input_ids[:-1] + [sep_token_id]
    #μ–΄ν…μ…˜ λ§ˆμŠ€ν¬λ„ μ‹œμž‘κ³Ό 끝 토큰을 κ³ λ €ν•˜μ—¬ μˆ˜μ •ν•©λ‹ˆλ‹€.
    attention_mask = [1] + attention_mask[:-1] + [1]

    ner_tokenizer_dict = {"input_ids": input_ids,
                          "attention_mask": attention_mask,
                          "token_type_ids": token_type_ids}
    
    return ner_tokenizer_dict

def get_ner_predictions(text, checkpoint):
    """
    ν† ν°ν™”ν•œ λ¬Έμž₯(tokenized_sent)κ³Ό μ˜ˆμΈ‘ν•œ νƒœκ·Έ(pred_tags) 값을 λ§Œλ“œλŠ” ν•¨μˆ˜μž…λ‹ˆλ‹€.
    Args:
        text: NER μ˜ˆμΈ‘μ„ ν•„μš”λ‘œ ν•˜λŠ” ν…μŠ€νŠΈλ₯Ό μž…λ ₯ν•©λ‹ˆλ‹€. 
        checkpoint: μ €μž₯ν•œ λͺ¨λΈμ„ λΆˆλŸ¬λ“€μž…λ‹ˆλ‹€.
    Returns:
        tokenized_sent: λͺ¨λΈ μž…λ ₯을 μœ„ν•œ ν† ν°ν™”λœ λ¬Έμž₯ μ •λ³΄μž…λ‹ˆλ‹€.
        pred_tags: 각 토큰에 λŒ€ν•œ 예츑된 νƒœκ·Έλ“€μ„ ν¬ν•¨ν•©λ‹ˆλ‹€.
    """
    #μ €μž₯ν•œ λͺ¨λΈμ„ λΆˆλŸ¬λ“€μž…λ‹ˆλ‹€.
    model = checkpoint['model']
    #νƒœκ·Έμ™€ ν•΄λ‹Ή νƒœκ·Έμ˜ ID 맀핑 정보λ₯Ό κ°€μ Έμ˜΅λ‹ˆλ‹€.
    tag2id = checkpoint['tag2id']
    model.to(device)
    #μž…λ ₯된 ν…μŠ€νŠΈμ—μ„œ 곡백을 μ–Έλ”μŠ€μ½”μ–΄(_)둜 λŒ€μ²΄ν•©λ‹ˆλ‹€.
    text = text.replace(' ', '_')

    #μ˜ˆμΈ‘κ°’κ³Ό μ‹€μ œ 라벨을 μ €μž₯ν•  빈 리슀트λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€.
    predictions, true_labels = [], []

    #ner_tokenizer ν•¨μˆ˜λ₯Ό μ‚¬μš©ν•˜μ—¬ ν…μŠ€νŠΈλ₯Ό ν† ν°ν™”ν•©λ‹ˆλ‹€.
    tokenized_sent = ner_tokenizer(text, len(text) + 2, checkpoint)

    #ν† ν°ν™”λœ κ²°κ³Όλ₯Ό ν† λŒ€λ‘œ ν…μ„œλ‘œ λ³€ν™˜ν•˜μ—¬ λͺ¨λΈ μž…λ ₯ ν˜•μ‹μ— 맞게 μ€€λΉ„ν•©λ‹ˆλ‹€.
    input_ids = torch.tensor(
        tokenized_sent['input_ids']).unsqueeze(0).to(device)
    attention_mask = torch.tensor(
        tokenized_sent['attention_mask']).unsqueeze(0).to(device)
    token_type_ids = torch.tensor(
        tokenized_sent['token_type_ids']).unsqueeze(0).to(device)

    #κ·Έλž˜λ””μ–ΈνŠΈ 계산을 μˆ˜ν–‰ν•˜μ§€ μ•ŠκΈ° μœ„ν•΄ torch.no_grad() μ»¨ν…μŠ€νŠΈ λ‚΄μ—μ„œ λ‹€μŒμ„ μ‹€ν–‰ν•©λ‹ˆλ‹€. (eval μ˜μ—­μ΄κΈ° λ•Œλ¬Έμ— ν•™μŠ΅μ„ ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€)
    with torch.no_grad():
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids)
        
    #λͺ¨λΈ 좜λ ₯μ—μ„œ λ‘œμ§“ 값을 가져와 Numpyκ°’μœΌλ‘œ λ³€ν™˜ν•˜κ³ , 라벨 ID듀을 CPU μƒμ˜ NumPy λ°°μ—΄λ‘œ κ°€μ Έμ˜΅λ‹ˆλ‹€.
    logits = outputs['logits']
    logits = logits.detach().cpu().numpy()
    label_ids = token_type_ids.cpu().numpy()

    #예츑된 라벨 값을 κ°€μ Έμ™€μ„œ λ¦¬μŠ€νŠΈμ— μΆ”κ°€ν•©λ‹ˆλ‹€.
    predictions.extend([list(p) for p in np.argmax(logits, axis=2)])
    #μ‹€μ œ 라벨을 λ¦¬μŠ€νŠΈμ— μΆ”κ°€ν•©λ‹ˆλ‹€.
    true_labels.append(label_ids)

    #예츑된 라벨 IDλ₯Ό μ‹€μ œ νƒœκ·Έλ‘œ λ³€ν™˜ν•©λ‹ˆλ‹€.
    pred_tags = [list(tag2id.keys())[p_i] for p in predictions for p_i in p]

    return tokenized_sent, pred_tags


def ner_inference(tokenized_sent, pred_tags, checkpoint, name_len=5) -> list:
    """
    NER을 μ‹€ν–‰ν•˜κ³ , 이름과 μ‹œκ°„ 및 곡간 정보λ₯Ό μΆ”μΆœν•©λ‹ˆλ‹€.
    Args:
        tokenized_sent: ν† ν°ν™”λœ λ¬Έμž₯이 μ €μž₯된 리슀트
        pred_tags: 각 토큰에 λŒ€ν•œ 예츑 νƒœκ·Έκ°’ (NER κ²°κ³Ό)
        checkpoint: μ €μž₯ν•΄λ‘” λͺ¨λΈμ„ 뢈러옴
        name_len: 더 μ •ν™•ν•œ 이름 인식을 μœ„ν•΄ μ•žλ’€λ‘œ λͺ‡ 개의 μŒμ ˆμ„ 더 검토할지 μ§€μ •ν•©λ‹ˆλ‹€.
    Returns:
        namelist: μΆ”μΆœν•œ 이름(별칭 포함) λ¦¬μŠ€νŠΈμž…λ‹ˆλ‹€. ν›„μ²˜λ¦¬λ₯Ό 톡해 
        scene: μΆ”μΆœν•œ μž₯μ†Œ μ‹œκ°„ μ‚¬μ „μž…λ‹ˆλ‹€.
    """    
    name_list = []
    speaker = ''
    tokenizer = checkpoint['tokenizer']
    scene = {'μž₯μ†Œ': [], 'μ‹œκ°„': []}
    target = ''
    c_tag = None

    for i, tag in enumerate(pred_tags):
        token = tokenizer.convert_ids_to_tokens(tokenized_sent['input_ids'][i]).replace('#', '')
        if 'PER' in tag:
            if 'B' in tag and speaker != '':
                name_list.append(speaker)
                speaker = ''
            speaker += token

        elif speaker != '' and tag != pred_tags[i-1]:
            if speaker in name_list:
                name_list.append(speaker)
            else:
                tmp = speaker
                found_name = False
                # print(f'{speaker}에 의문이 생겨 ν™•μΈν•΄λ΄…λ‹ˆλ‹€.')
                for j in range(name_len):
                    if i + j < len(tokenized_sent['input_ids']):
                        token = tokenizer.convert_ids_to_tokens(
                            tokenized_sent['input_ids'][i+j]).replace('#', '')
                        tmp += token
                        # print(f'{speaker} λ’€λ‘œ λ‚˜μ˜¨ {j} 번째 κΉŒμ§€ ν™•μΈν•œκ²°κ³Ό, {tmp} μž…λ‹ˆλ‹€')
                        if tmp in name_list:
                            name_list.append(tmp)
                            found_name = True
                            # print(f'λͺ…단에 {tmp} κ°€ μ‘΄μž¬ν•˜μ—¬, {speaker} λŒ€μ‹  μΆ”κ°€ν•˜μ˜€μŠ΅λ‹ˆλ‹€.')
                            break

                if not found_name:
                    name_list.append(speaker)
                    # print(f'찾지 λͺ»ν•˜μ—¬ {speaker} λ₯Ό μΆ”κ°€ν•˜μ˜€μŠ΅λ‹ˆλ‹€.')
                speaker = ''

        elif tag != 'O':
            if tag.startswith('B'):
                if c_tag in ['TIM', 'DAT']:
                    scene['μ‹œκ°„'].append(target)
                elif c_tag =='LOC':
                    scene['μž₯μ†Œ'].append(target)
                c_tag = tag[2:]
                target = token
            else:
                target += token.replace('_', ' ')

    return name_list, scene


def make_name_list(ner_inputs, checkpoint):
    """
    λ¬Έμž₯듀을 NER λŒλ €μ„œ Name List λ§Œλ“€κΈ°.
    """
    name_list = []
    times = []
    places = []

    for ner_input in ner_inputs:
        tokenized_sent, pred_tags = get_ner_predictions(ner_input, checkpoint)
        names, scene = ner_inference(tokenized_sent, pred_tags, checkpoint)
        name_list.extend(names)
        times.extend(scene['μ‹œκ°„'])
        places.extend(scene['μž₯μ†Œ'])

    return name_list, times, places


def show_name_list(name_list):
    """
    μ‚¬μš©μž μΉœν™”μ μœΌλ‘œ λ„€μž„λ¦¬μŠ€νŠΈλ₯Ό λ³΄μ—¬μ€λ‹ˆλ‹€.
    Arg:
        name_list: μΆ”μΆœν•œ 이름 리슀트
    Return:
        name: λ™μΌν•œ 이름이 λͺ‡ 번 λ“±μž₯ν–ˆλŠ”μ§€ 횟수λ₯Ό ν•¨κ»˜ μ œκ³΅ν•©λ‹ˆλ‹€.
    """
    name = dict(Counter(name_list))

    return name


def compare_strings(str1, str2):
    """
    ner둜 μΆ”μΆœν•œ 인λͺ…을 ν›„μ²˜λ¦¬ν•˜λŠ” μ½”λ“œμž…λ‹ˆλ‹€.
    비ꡐ할 두 λ¬Έμžμ—΄μ˜ 길이가 λ‹€λ₯Ό 경우, 더 짧은 λ¬Έμžμ—΄μ΄ 더 κΈ΄ λ¬Έμžμ—΄μ— ν¬ν•¨λ˜λŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.
    비ꡐ할 두 λ¬Έμžμ—΄μ˜ 길이가 같을 경우, κ²ΉμΉ˜λŠ” 뢀뢄이 2κΈ€μž 이상일 경우 같은 μ΄λ¦„μœΌλ‘œ μ§€μ •ν•©λ‹ˆλ‹€. 
    이 ν•¨μˆ˜μ™€ μ•„λž˜μ˜ combine_similar_namesλ₯Ό ν•¨κ»˜ μ‹€ν–‰ν•˜λ©΄, 'ν•œλ‹€μ •'κ³Ό '닀정이', '닀정이가' 등은 λͺ¨λ‘ ν•˜λ‚˜μ˜ 인물둜 묢을 수 μžˆμŠ΅λ‹ˆλ‹€.

    Args: λΉ„κ΅ν•˜λ €λŠ” 두 λ¬Έμžμ—΄
    Return: 두 λ¬Έμžμ—΄μ΄ 같은 μ΄λ¦„μœΌλ‘œ νŒλ‹¨λ  경우 True, 아닐 경우 False
    """
    if len(str1) != len(str2):
        # 더 짧은 λ¬Έμžμ—΄μ΄ 더 κΈ΄ λ¬Έμžμ—΄μ— ν¬ν•¨λ˜λŠ”μ§€ 확인
        shorter, longer = (str1, str2) if len(str1) < len(str2) else (str2, str1)
        if shorter in longer:
            return True
    else:
        same_part = []
        for i in range(len(str1)):
            if str1[i] in str2:
                same_part += str1[i]
                continue
            else:
                break
        if len(same_part) >= 2:
            return True

    return False

def combine_similar_names(names_dict):
    """
    compare_strings ν•¨μˆ˜λ₯Ό λ°”νƒ•μœΌλ‘œ μœ μ‚¬ν•œ 이름을 ν•¨κ»˜ λ¬ΆμŠ΅λ‹ˆλ‹€.
    2κΈ€μžλŠ” 이름일 ν™•λ₯ μ΄ λ†’μœΌλ‹ˆ κΈ°μ€€μ μœΌλ‘œ μ§€μ •ν•©λ‹ˆλ‹€.
    """
    names = names_dict.keys()
    similar_groups = [[name] for name in names if len(name) == 2]
    idx = 0
    # print(similar_groups, '\n',idx)

    for name in names:
        found = False
        for group in similar_groups:
            idx += 1
            for item in group:
                if compare_strings(name, item) and len(name)>1:
                    found = True
                    cleaned_text = re.sub(r'(μ•„|이)$', '', item)
                    if len(name) == len(item):
                        same_part = ''
                        # μ™„μ „νžˆ μΌμΉ˜ν•˜λŠ” 뢀뢄이 μžˆλŠ”μ§€ 확인
                        for i in range(len(name)):
                            if name[i] in item:
                                same_part += name[i]
                        if same_part not in group and cleaned_text not in group:
                            group.append(cleaned_text)
                            # print(similar_groups, '\n',idx, 'λ¬Έμžμ—΄μ˜ 길이가 같을 λ•Œ')
                    else:    
                        group.append(name)
                        # print(similar_groups, '\n',idx, 'λ¬Έμžμ—΄μ˜ 길이가 λ‹€λ₯Ό λ•Œ')
                        break
            if found:
                break
        if not found:
            similar_groups.append([name])

    updated_names = {tuple(name for name in group if len(name) > 1): counts for group, counts in (
        (group, sum(names_dict[name] for name in group if name != '')) for group in similar_groups)
        if len([name for name in group if len(name) > 1]) > 0}

    return updated_names

def convert_name2codename(codename2name, text):
    """REλ₯Ό μ΄μš©ν•˜μ—¬ 이름을 μ½”λ“œλ„€μž„μœΌλ‘œ λ³€κ²½ν•©λ‹ˆλ‹€. μ΄λ•Œ 각 μ½”λ“œλ„€μž„μ˜ λ²ˆν˜ΈλŠ” λΉˆλ„μˆ˜ κΈ°μ€€ λ‚΄λ¦Όμ°¨μˆœν•œ κ²°κ³Όμž…λ‹ˆλ‹€."""
    import re
    for n_list in codename2name.values():
        n_list.sort(key=lambda x:(len(x), x), reverse=True)

    for codename, n_list in codename2name.items():
        for subname in n_list:
            text = re.sub(subname, codename, text)

    return text


def convert_codename2name(codename2name, text):
    """μ½”λ“œλ„€μž„μ„ μ΄λ¦„μœΌλ‘œ λ³€κ²½ν•΄μ€λ‹ˆλ‹€."""
    outputs = []
    for i in text:
        try:
            outputs.append(codename2name[i][0])
        except:
            outputs.append('μ•Œ 수 μ—†μŒ')

    return outputs