Spaces:
Runtime error
Runtime error
import torch; torch.manual_seed(0) | |
import torch.utils | |
from torch.utils.data import DataLoader | |
import torch.distributions | |
import torch.nn as nn | |
import matplotlib.pyplot as plt; plt.rcParams['figure.dpi'] = 200 | |
from src.cocktails.representation_learning.dataset import MyDataset, get_representation_from_ingredient, get_max_n_ingredients | |
import json | |
import pandas as pd | |
import numpy as np | |
import os | |
from src.cocktails.representation_learning.simple_model import SimpleNet | |
from src.cocktails.config import COCKTAILS_CSV_DATA, FULL_COCKTAIL_REP_PATH, EXPERIMENT_PATH | |
from src.cocktails.utilities.cocktail_utilities import get_bunch_of_rep_keys | |
from src.cocktails.utilities.ingredients_utilities import ingredient_profiles | |
from resource import getrusage | |
from resource import RUSAGE_SELF | |
import gc | |
gc.collect(2) | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
def get_params(): | |
data = pd.read_csv(COCKTAILS_CSV_DATA) | |
max_ingredients, ingredient_set, liquor_set, liqueur_set = get_max_n_ingredients(data) | |
num_ingredients = len(ingredient_set) | |
rep_keys = get_bunch_of_rep_keys()['custom'] | |
ing_keys = [k.split(' ')[1] for k in rep_keys] | |
ing_keys.remove('volume') | |
nb_ing_categories = len(set(ingredient_profiles['type'])) | |
category_encodings = dict(zip(sorted(set(ingredient_profiles['type'])), np.eye(nb_ing_categories))) | |
params = dict(trial_id='test', | |
save_path=EXPERIMENT_PATH + "/simple_net/", | |
nb_epochs=100, | |
print_every=50, | |
plot_every=50, | |
batch_size=128, | |
lr=0.001, | |
dropout=0.15, | |
output_keyword='glasses', | |
ing_keys=ing_keys, | |
nb_ingredients=len(ingredient_set), | |
hidden_dims=[16], | |
activation='sigmoid', | |
auxiliaries_dict=dict(categories=dict(weight=0, type='classif', final_activ=None, dim_output=len(set(data['subcategory']))), | |
glasses=dict(weight=0, type='classif', final_activ=None, dim_output=len(set(data['glass']))), | |
prep_type=dict(weight=0, type='classif', final_activ=None, dim_output=len(set(data['category']))), | |
cocktail_reps=dict(weight=0, type='regression', final_activ=None, dim_output=13), | |
volume=dict(weight=0, type='regression', final_activ='relu', dim_output=1), | |
taste_reps=dict(weight=0, type='regression', final_activ='relu', dim_output=2), | |
ingredients_presence=dict(weight=0, type='multiclassif', final_activ=None, dim_output=num_ingredients), | |
ingredients_quantities=dict(weight=0, type='regression', final_activ=None, dim_output=num_ingredients)), | |
category_encodings=category_encodings | |
) | |
params['output_dim'] = params['auxiliaries_dict'][params['output_keyword']]['dim_output'] | |
water_rep, indexes_to_normalize = get_representation_from_ingredient(ingredients=['water'], quantities=[1], | |
max_q_per_ing=dict(zip(ingredient_set, [1] * num_ingredients)), index=0, | |
params=params) | |
dim_rep_ingredient = water_rep.size | |
params['indexes_ing_to_normalize'] = indexes_to_normalize | |
params['deepset_latent_dim'] = dim_rep_ingredient * max_ingredients | |
params['dim_rep_ingredient'] = dim_rep_ingredient | |
params['input_dim'] = params['nb_ingredients'] | |
params = compute_expe_name_and_save_path(params) | |
del params['category_encodings'] # to dump | |
with open(params['save_path'] + 'params.json', 'w') as f: | |
json.dump(params, f) | |
params = complete_params(params) | |
return params | |
def complete_params(params): | |
data = pd.read_csv(COCKTAILS_CSV_DATA) | |
cocktail_reps = np.loadtxt(FULL_COCKTAIL_REP_PATH) | |
nb_ing_categories = len(set(ingredient_profiles['type'])) | |
category_encodings = dict(zip(sorted(set(ingredient_profiles['type'])), np.eye(nb_ing_categories))) | |
params['cocktail_reps'] = cocktail_reps | |
params['raw_data'] = data | |
params['category_encodings'] = category_encodings | |
return params | |
def compute_confusion_matrix_and_accuracy(predictions, ground_truth): | |
bs, n_options = predictions.shape | |
predicted = predictions.argmax(dim=1).detach().numpy() | |
true = ground_truth.int().detach().numpy() | |
confusion_matrix = np.zeros([n_options, n_options]) | |
for i in range(bs): | |
confusion_matrix[true[i], predicted[i]] += 1 | |
acc = confusion_matrix.diagonal().sum() / bs | |
for i in range(n_options): | |
if confusion_matrix[i].sum() != 0: | |
confusion_matrix[i] /= confusion_matrix[i].sum() | |
acc2 = np.mean(predicted == true) | |
assert (acc - acc2) < 1e-5 | |
return confusion_matrix, acc | |
def run_epoch(opt, train, model, data, loss_function, params): | |
if train: | |
model.train() | |
else: | |
model.eval() | |
# prepare logging of losses | |
losses = [] | |
accuracies = [] | |
cf_matrices = [] | |
if train: opt.zero_grad() | |
for d in data: | |
nb_ingredients = d[0] | |
batch_size = nb_ingredients.shape[0] | |
x_ingredients = d[1].float() | |
ingredient_quantities = d[2].float() | |
cocktail_reps = d[3].float() | |
auxiliaries = d[4] | |
for k in auxiliaries.keys(): | |
if auxiliaries[k].dtype == torch.float64: auxiliaries[k] = auxiliaries[k].float() | |
taste_valid = d[-1] | |
predictions = model(ingredient_quantities) | |
loss = loss_function(predictions, auxiliaries[params['output_keyword']].long()).float() | |
cf_matrix, accuracy = compute_confusion_matrix_and_accuracy(predictions, auxiliaries[params['output_keyword']]) | |
if train: | |
loss.backward() | |
opt.step() | |
opt.zero_grad() | |
losses.append(float(loss)) | |
cf_matrices.append(cf_matrix) | |
accuracies.append(accuracy) | |
return model, np.mean(losses), np.mean(accuracies), np.mean(cf_matrices, axis=0) | |
def prepare_data_and_loss(params): | |
train_data = MyDataset(split='train', params=params) | |
test_data = MyDataset(split='test', params=params) | |
train_data_loader = DataLoader(train_data, batch_size=params['batch_size'], shuffle=True) | |
test_data_loader = DataLoader(test_data, batch_size=params['batch_size'], shuffle=True) | |
if params['auxiliaries_dict'][params['output_keyword']]['type'] == 'classif': | |
if params['output_keyword'] == 'glasses': | |
classif_weights = train_data.glasses_weights | |
elif params['output_keyword'] == 'prep_type': | |
classif_weights = train_data.prep_types_weights | |
elif params['output_keyword'] == 'categories': | |
classif_weights = train_data.categories_weights | |
else: | |
raise ValueError | |
# classif_weights = (np.array(classif_weights) * 2 + np.ones(len(classif_weights))) / 3 | |
loss_function = nn.CrossEntropyLoss(torch.FloatTensor(classif_weights)) | |
# loss_function = nn.CrossEntropyLoss() | |
elif params['auxiliaries_dict'][params['output_keyword']]['type'] == 'multiclassif': | |
loss_function = nn.BCEWithLogitsLoss() | |
elif params['auxiliaries_dict'][params['output_keyword']]['type'] == 'regression': | |
loss_function = nn.MSELoss() | |
else: | |
raise ValueError | |
return loss_function, train_data_loader, test_data_loader | |
def print_losses(train, loss, accuracy): | |
keyword = 'Train' if train else 'Eval' | |
print(f'\t{keyword} logs:') | |
print(f'\t\t Loss: {loss:.2f}, Acc: {accuracy:.2f}') | |
def run_experiment(params, verbose=True): | |
loss_function, train_data_loader, test_data_loader = prepare_data_and_loss(params) | |
model = SimpleNet(params['input_dim'], params['hidden_dims'], params['output_dim'], params['activation'], params['dropout']) | |
opt = torch.optim.AdamW(model.parameters(), lr=params['lr']) | |
all_train_losses = [] | |
all_eval_losses = [] | |
all_eval_cf_matrices = [] | |
all_train_accuracies = [] | |
all_eval_accuracies = [] | |
all_train_cf_matrices = [] | |
best_loss = np.inf | |
model, eval_loss, eval_accuracy, eval_cf_matrix = run_epoch(opt=opt, train=False, model=model, data=test_data_loader, loss_function=loss_function, params=params) | |
all_eval_losses.append(eval_loss) | |
all_eval_accuracies.append(eval_accuracy) | |
if verbose: print(f'\n--------\nEpoch #0') | |
if verbose: print_losses(train=False, accuracy=eval_accuracy, loss=eval_loss) | |
for epoch in range(params['nb_epochs']): | |
if verbose and (epoch + 1) % params['print_every'] == 0: print(f'\n--------\nEpoch #{epoch+1}') | |
model, train_loss, train_accuracy, train_cf_matrix = run_epoch(opt=opt, train=True, model=model, data=train_data_loader, loss_function=loss_function, params=params) | |
if verbose and (epoch + 1) % params['print_every'] == 0: print_losses(train=True, accuracy=train_accuracy, loss=train_loss) | |
model, eval_loss, eval_accuracy, eval_cf_matrix = run_epoch(opt=opt, train=False, model=model, data=test_data_loader, loss_function=loss_function, params=params) | |
if verbose and (epoch + 1) % params['print_every'] == 0: print_losses(train=False, accuracy=eval_accuracy, loss=eval_loss) | |
if eval_loss < best_loss: | |
best_loss = eval_loss | |
if verbose: print(f'Saving new best model with loss {best_loss:.2f}') | |
torch.save(model.state_dict(), params['save_path'] + f'checkpoint_best.save') | |
# log | |
all_train_losses.append(train_loss) | |
all_train_accuracies.append(train_accuracy) | |
all_eval_losses.append(eval_loss) | |
all_eval_accuracies.append(eval_accuracy) | |
all_eval_cf_matrices.append(eval_cf_matrix) | |
all_train_cf_matrices.append(train_cf_matrix) | |
if (epoch + 1) % params['plot_every'] == 0: | |
plot_results(all_train_losses, all_train_accuracies, all_train_cf_matrices, | |
all_eval_losses, all_eval_accuracies, all_eval_cf_matrices, params['plot_path']) | |
return model | |
def plot_results(all_train_losses, all_train_accuracies, all_train_cf_matrices, | |
all_eval_losses, all_eval_accuracies, all_eval_cf_matrices, plot_path): | |
steps = np.arange(len(all_eval_accuracies)) | |
plt.figure() | |
plt.title('Losses') | |
plt.plot(steps[1:], all_train_losses, label='train') | |
plt.plot(steps, all_eval_losses, label='eval') | |
plt.legend() | |
plt.ylim([0, 4]) | |
plt.savefig(plot_path + 'losses.png', dpi=200) | |
fig = plt.gcf() | |
plt.close(fig) | |
plt.figure() | |
plt.title('Accuracies') | |
plt.plot(steps[1:], all_train_accuracies, label='train') | |
plt.plot(steps, all_eval_accuracies, label='eval') | |
plt.legend() | |
plt.ylim([0, 1]) | |
plt.savefig(plot_path + 'accs.png', dpi=200) | |
fig = plt.gcf() | |
plt.close(fig) | |
plt.figure() | |
plt.title('Train confusion matrix') | |
plt.ylabel('True') | |
plt.xlabel('Predicted') | |
plt.imshow(all_train_cf_matrices[-1], vmin=0, vmax=1) | |
plt.colorbar() | |
plt.savefig(plot_path + f'train_confusion_matrix.png', dpi=200) | |
fig = plt.gcf() | |
plt.close(fig) | |
plt.figure() | |
plt.title('Eval confusion matrix') | |
plt.ylabel('True') | |
plt.xlabel('Predicted') | |
plt.imshow(all_eval_cf_matrices[-1], vmin=0, vmax=1) | |
plt.colorbar() | |
plt.savefig(plot_path + f'eval_confusion_matrix.png', dpi=200) | |
fig = plt.gcf() | |
plt.close(fig) | |
plt.close('all') | |
def get_model(model_path): | |
with open(model_path + 'params.json', 'r') as f: | |
params = json.load(f) | |
params['save_path'] = model_path | |
model_chkpt = model_path + "checkpoint_best.save" | |
model = SimpleNet(params['input_dim'], params['hidden_dims'], params['output_dim'], params['activation'], params['dropout']) | |
model.load_state_dict(torch.load(model_chkpt)) | |
model.eval() | |
return model, params | |
def compute_expe_name_and_save_path(params): | |
weights_str = '[' | |
for aux in params['auxiliaries_dict'].keys(): | |
weights_str += f'{params["auxiliaries_dict"][aux]["weight"]}, ' | |
weights_str = weights_str[:-2] + ']' | |
save_path = params['save_path'] + params["trial_id"] | |
save_path += f'_lr{params["lr"]}' | |
save_path += f'_bs{params["batch_size"]}' | |
save_path += f'_hd{params["hidden_dims"]}' | |
save_path += f'_activ{params["activation"]}' | |
save_path += f'_w{weights_str}' | |
counter = 0 | |
while os.path.exists(save_path + f"_{counter}"): | |
counter += 1 | |
save_path = save_path + f"_{counter}" + '/' | |
params["save_path"] = save_path | |
os.makedirs(save_path) | |
os.makedirs(save_path + 'plots/') | |
params['plot_path'] = save_path + 'plots/' | |
print(f'logging to {save_path}') | |
return params | |
if __name__ == '__main__': | |
params = get_params() | |
run_experiment(params) | |