Spaces:
Runtime error
Runtime error
import os, subprocess, pydriller,json, pandas as pd | |
import sys | |
from dotenv import dotenv_values | |
from Database import Database | |
class RefactorAnalysis: | |
def __init__(self,input_path="",output_path=""): | |
if input_path=="": | |
self.input_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"data","refactoring-toy-example") | |
else: | |
self.input_path=input_path | |
if output_path=="": | |
self.output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"output_ref","output.json") | |
else: | |
self.output_path=output_path | |
def generate_refactor_details(self): | |
# ref_miner_bin = os.path.join(os.path.dirname(os.path.abspath(__file__)),"executable","RefactoringMiner","bin") | |
ref_miner_bin = os.path.abspath("executable/RefactoringMiner/bin") | |
# command = ["cd",ref_miner_bin,"&&","sh","RefactoringMiner","-a",self.input_path,"-json",self.output_path] | |
command = ["sh","RefactoringMiner","-a",self.input_path,"-json",self.output_path] | |
try: | |
os.chdir(ref_miner_bin) | |
shell_result = subprocess.run(command,capture_output=True,text=True) | |
shell_result.check_returncode() | |
# if shell_result!=0: | |
# raise Exception("Couldn't analyze repository - "+self.input_path+" with RefactorMiner") | |
# return 0 | |
except subprocess.CalledProcessError as error: | |
print(error) | |
sys.exit() | |
except Exception as e: | |
print(e) | |
return 1 | |
def parse_json_output(self): | |
#TODO | |
#Filter for Method Refs | |
with open(self.output_path) as f: | |
json_output = json.load(f) | |
dict_output = {} | |
for obj in json_output["commits"]: | |
if len(obj["refactorings"])==0: | |
continue | |
changes = [] | |
se_lines = [] | |
for ref in obj["refactorings"]: | |
if not "Method" in ref["type"]: | |
continue | |
for parent_refs in ref["leftSideLocations"]: | |
changes.append(parent_refs["filePath"]) | |
se_lines.append((parent_refs["startLine"],parent_refs["endLine"])) | |
# list_output.append(dict_output) | |
dict_output[obj["sha1"]]={ | |
"paths":changes, | |
"ref_start_end":se_lines, | |
"ref_type":ref["type"] | |
} | |
return dict_output | |
def create_project_dataframe(self): | |
df = pd.DataFrame(columns=['commit','refactoring_type','filename','meth_rf_neg','method_refactored']) | |
parse_output_dict = self.parse_json_output() | |
commits_to_analyze = list(parse_output_dict.keys()) | |
for commit in pydriller.Repository(self.input_path, only_commits=commits_to_analyze).traverse_commits(): | |
ref_list = parse_output_dict.get(commit.hash) | |
ref_path_name = list(map(lambda x: str(x).split("/")[len(str(x).split("/"))-1],ref_list["paths"])) | |
for cf in commit.modified_files: | |
try: | |
index_ref = ref_path_name.index(cf.filename) | |
except ValueError as ve: | |
continue | |
if len(cf.changed_methods)==0: | |
continue | |
#Diff between methods_changed and methods_before - does methods_changed reduces loop else we have to loop for all methods | |
for cm in cf.changed_methods: | |
if cm.start_line<=ref_list["ref_start_end"][index_ref][0] and cm.end_line>=ref_list["ref_start_end"][index_ref][1]: | |
method_source_code = self.__split_and_extract_methods(cf.source_code_before,cm.start_line,cm.end_line) | |
method_source_code_neg = self.__split_and_extract_methods(cf.source_code,cm.start_line,cm.end_line) | |
class_source_code = cf.source_code_before | |
# df_row = {"commit":commit.hash,"refactoring_type":ref_list["ref_type"],"filename":cf.filename, "meth_rf_neg":class_source_code,"method_refactored":method_source_code} | |
df_row = {"commit":commit.hash,"refactoring_type":ref_list["ref_type"],"filename":cf.filename, "meth_rf_neg":method_source_code_neg,"method_refactored":method_source_code} | |
df.loc[len(df)] = df_row | |
return df | |
def __split_and_extract_methods(self, source_code,start_line, end_line): | |
source_code_lines = str(source_code).splitlines() | |
return "\n".join(source_code_lines[start_line-1:end_line]) | |
def main(): | |
if not os.path.exists("data/repos/"): | |
try: | |
print("Starting repo download") | |
repo_script = subprocess.run(["python","repo_download.py"], capture_output=True, text=True) | |
repo_script.check_returncode() | |
except subprocess.CalledProcessError as err: | |
print(err) | |
sys.exit(1) | |
print("Repo Download Completed") | |
lst_repos = next(os.walk("data/repos/"))[1] | |
print(len(lst_repos)) | |
cwd = os.path.dirname(os.path.abspath(__file__)) | |
final_df = pd.DataFrame(columns=['commit','refactoring_type','filename','meth_rf_neg','method_refactored']) | |
database = Database(dotenv_values(".env")['COLLECTION_NAME']) | |
# database.connect_db() | |
count=1 | |
batch_size = 5 | |
for idx,repo in enumerate(lst_repos): | |
os.chdir(cwd) | |
try: | |
ref_obj = RefactorAnalysis(os.path.abspath(os.path.join("data/repos",repo)),os.path.abspath(os.path.join("output_ref",repo+".json"))) | |
# ref_miner = ref_obj.generate_refactor_details() #Modify | |
df = ref_obj.create_project_dataframe() | |
except Exception as e: | |
print(e) | |
continue | |
final_df = pd.concat([final_df,df], ignore_index=True) | |
if count==batch_size or idx==len(lst_repos)-1: | |
print("Inserting into DB", idx) | |
database.insert_docs(final_df.to_dict(orient="records")) | |
final_df = pd.DataFrame(columns=['commit','refactoring_type','filename','meth_rf_neg','method_refactored']) | |
count=1 | |
else: | |
count+=1 | |
if __name__=="__main__": | |
main() |