Spaces:
Configuration error
Configuration error
import sys | |
import math | |
import re | |
import heapq | |
from collections import defaultdict, Counter | |
from typing import List, Tuple, Dict | |
class TextProcessor: | |
def __init__(self, texto): | |
self.texto = texto | |
def entropy(self): | |
simbolos = {} | |
total_caracteres = len(self.texto) | |
for caracter in self.texto: | |
simbolos[caracter] = simbolos.get(caracter, 0) + 1 | |
entropia = 0 | |
for count in simbolos.values(): | |
probabilidad = count / total_caracteres | |
entropia -= probabilidad * math.log2(probabilidad) | |
return simbolos, entropia | |
def common_string(self, cadena1, cadena2): | |
longitud1 = len(cadena1) | |
longitud2 = len(cadena2) | |
comun = '' | |
subcadenas_comunes = [] | |
for i in range(longitud1): | |
for j in range(longitud2): | |
k = 0 | |
while (i+k < longitud1 and j+k < longitud2 and cadena1[i+k] == cadena2[j+k]): | |
k += 1 | |
if k > 0: | |
subcadenas_comunes.append(cadena1[i:i+k]) | |
if subcadenas_comunes: | |
comun = max(subcadenas_comunes, key=len) | |
return comun | |
def magic_split(self): | |
unique_symbols = set(self.texto) | |
symbol_distances = {} | |
for symbol in unique_symbols: | |
indices = [i for i, char in enumerate(self.texto) if char == symbol] | |
if len(indices) > 1: | |
distances = [indices[i + 1] - indices[i] for i in range(len(indices) - 1)] | |
symbol_distances[symbol] = distances | |
variation = {symbol: max(distances) - min(distances) for symbol, distances in symbol_distances.items() if distances} | |
mins = {} | |
for v in variation: | |
if variation[v]!=0 and variation[v]!=1: | |
mins[v] = variation[v] | |
best_symbol = min(mins, key=mins.get) | |
return best_symbol | |
def rotate_string(self, string, n): | |
indice = n % len(string) | |
string_rotado = string[indice:] + string[:indice] | |
return string_rotado | |
def rotate_compare(self, tokiA, tokiB): | |
if tokiA >= tokiB: | |
tokA = tokiA | |
tokB = tokiB | |
ltokA = len(tokA) | |
else: | |
tokA = tokiB | |
tokB = tokiA | |
ltokA = len(tokB) | |
i = 0 | |
rotations = {} | |
while i < ltokA: | |
tokrotated = self.rotate_string(tokA, i) | |
rotations[str(i)] = self.common_string(tokrotated, tokB) | |
i += 1 | |
best_r = "" | |
for x in rotations: | |
lb = len(best_r) | |
rot = rotations[x] | |
lrot = len(rot) | |
if lrot > 1 and lrot < ltokA and lrot > lb: | |
best_r = rot | |
return best_r | |
def get_subTokens(self, spl): | |
sub_tokens = self.texto.split(spl) | |
toks = [] | |
for tok in sub_tokens: | |
for tok2 in sub_tokens: | |
if tok != tok2: | |
toks.append(self.rotate_compare(tok, tok2)) | |
return list(set(toks)) | |
def tokenize(self, spliter_optimo): | |
tokens = self.get_subTokens(spliter_optimo) | |
tokenized_sentence = {} | |
chunk = self.texto.split(spliter_optimo) | |
for txt in chunk: | |
best_split = "" | |
if len(txt)<3: | |
tokenized_sentence[txt]= txt | |
else: | |
for tok in tokens: | |
if tok != "": | |
lt = len(tok) | |
lb = len(best_split) | |
spltxt = txt.split(tok) | |
if len(spltxt) > 1: | |
l0 = len(spltxt[0]) | |
l1 = len(spltxt[1]) | |
if lt < len(txt) and lt > lb: | |
best_split = tok | |
tokenized_sentence[txt] = " " + spltxt[0] + "-" + tok + "-" + spltxt[1] | |
return tokenized_sentence | |
def symbol_distances(self,texto, tokens): | |
# Ordena los tokens por longitud descendente para garantizar la divisi贸n m谩s larga posible. | |
txt = texto | |
for tok in tokens: | |
if tok !='': | |
txt = txt.replace(tok,"-"+tok+"-") | |
#print(txt) | |
arr = txt.split("-") | |
return [elem for elem in arr if elem != ''] | |
def distances(self,tokens): | |
tokens_unicos = {} | |
for i, token in enumerate(tokens): | |
if token not in tokens_unicos: | |
tokens_unicos[token] = [i] | |
else: | |
tokens_unicos[token].append(i) | |
return tokens_unicos | |
def from_distances(self,tokens_distancias): | |
rebuild={} | |
recoded_dic={} | |
for tok in tokens_distancias: | |
for dis in tokens_distancias[tok]: | |
try: | |
rebuild[dis]=tok | |
recoded_dic[dis] = gindex(tokens_distancias,tok) | |
except: | |
pass | |
enc = {k: recoded_dic[k] for k in sorted(recoded_dic)} | |
rebu = {k: rebuild[k] for k in sorted(rebuild)} | |
dic_str = "" | |
for d in tokens_distancias: | |
dic_str+=","+d | |
enc_str = "" | |
for e in enc: | |
enc_str += ","+str(enc[e]) | |
return dic_str,enc_str | |
def gindex(obj, key): | |
keys = list(obj.keys()) | |
try: | |
index = keys.index(key) | |
return index | |
except ValueError: | |
return None # Key not found in the dictionary | |
# Ejemplo de uso: | |
texto_ejemplo = "cuando te digo vete , te aburres , corres o andas ? cuando me dices vete , me aburro, corro y ando" | |
processor = TextProcessor(texto_ejemplo) | |
spliter_optimo = processor.magic_split() | |
tokenized_sentence = processor.tokenize(spliter_optimo) | |
token_txt ="" | |
for token in tokenized_sentence: | |
token_txt += "-"+tokenized_sentence[token] | |
tokens = set(token_txt.split("-")) | |
symb = processor.symbol_distances(texto_ejemplo,tokens) | |
print("Tokens") | |
print(tokens) | |
print("Number of symbols in tokens:") | |
print(len(tokens)) | |
print("Number of symbols in chars:") | |
print(len(set(texto_ejemplo))) | |
print("Length of text",len(texto_ejemplo)) | |
print("Texto original:", texto_ejemplo) | |
print("Spliter 贸ptimo:", spliter_optimo) | |
print("Frase tokenizada:", tokenized_sentence) | |
print("Length tokenized",len(tokenized_sentence)) | |
print("Token Sentences", symb) | |
print("Lenght Token Sentence", len(symb)) | |
print("Length Symbols Token Dictionary",len(set(symb))) | |
distances = processor.distances(symb) | |
print("Token Distances", distances) | |
print("Token Distance Length", len(distances)) | |
print(gindex(distances,"cu")) | |
dic_str,enc_str = processor.from_distances(distances) | |
print(dic_str,enc_str) | |
class HuffmanNode: | |
def __init__(self, char: str, freq: int): | |
self.char = char | |
self.freq = freq | |
self.left = None | |
self.right = None | |
def __lt__(self, other): | |
return self.freq < other.freq | |
def build_huffman_tree(text: str) -> HuffmanNode: | |
frequency = Counter(text) | |
priority_queue = [HuffmanNode(char, freq) for char, freq in frequency.items()] | |
heapq.heapify(priority_queue) | |
while len(priority_queue) > 1: | |
left = heapq.heappop(priority_queue) | |
right = heapq.heappop(priority_queue) | |
merged_node = HuffmanNode(None, left.freq + right.freq) | |
merged_node.left = left | |
merged_node.right = right | |
heapq.heappush(priority_queue, merged_node) | |
return priority_queue[0] | |
def encode_huffman_tree(node: HuffmanNode, prefix: str = "") -> Dict[str, str]: | |
if node is None: | |
return {} | |
if node.char is not None: | |
return {node.char: prefix} | |
encoding = {} | |
encoding.update(encode_huffman_tree(node.left, prefix + "0")) | |
encoding.update(encode_huffman_tree(node.right, prefix + "1")) | |
return encoding | |
def huffman_encode(text: str) -> Tuple[Dict[str, str], bytes]: | |
root = build_huffman_tree(text) | |
encoding_map = encode_huffman_tree(root) | |
encoded_text = ''.join(encoding_map[char] for char in text) | |
# Asegurarse de que la longitud de la cadena codificada es m煤ltiplo de 8 para la conversi贸n a bytes | |
remainder = len(encoded_text) % 8 | |
if remainder != 0: | |
encoded_text += '0' * (8 - remainder) | |
# Convertir la cadena binaria a bytes | |
encoded_bytes = bytes(int(encoded_text[i:i+8], 2) for i in range(0, len(encoded_text), 8)) | |
return encoding_map, encoded_bytes | |
def huffman_decode(encoding_map: Dict[str, str], encoded_bytes: bytes) -> str: | |
# Convertir bytes a una cadena binaria | |
encoded_text = ''.join(format(byte, '08b') for byte in encoded_bytes) | |
decoding_map = {code: char for char, code in encoding_map.items()} | |
decoded_text = "" | |
current_code = "" | |
for bit in encoded_text: | |
current_code += bit | |
if current_code in decoding_map: | |
decoded_text += decoding_map[current_code] | |
current_code = "" | |
return decoded_text | |
def guardar_binarios_en_archivo(binarios: List[bytes], nombre_archivo: str): | |
with open(nombre_archivo, 'wb') as archivo: | |
for binario in binarios: | |
archivo.write(binario) | |
archivo.write(b'\n') # Separador entre los binarios | |
print(f"Datos binarios guardados en el archivo '{nombre_archivo}'") | |
# Ejemplo de uso | |
cadena1 = dic_str | |
cadena2 = enc_str | |
# Codificar cadena1 y cadena2 | |
encoding_map1, encoded_bytes1 = huffman_encode(cadena1) | |
encoding_map2, encoded_bytes2 = huffman_encode(cadena2) | |
# Guardar binarios en un solo archivo | |
guardar_binarios_en_archivo([encoded_bytes1, encoded_bytes2], "text.txt.nos") | |