import os import gzip import json import openai import jsonlines from typing import List openai.api_key = os.getenv("OPENAI_API_KEY") def make_printv(verbose: bool): def print_v(*args, **kwargs): if verbose: kwargs["flush"] = True print(*args, **kwargs) else: pass return print_v def read_jsonl(path: str) -> List[dict]: if not os.path.exists(path): raise FileNotFoundError(f"File `{path}` does not exist.") elif not path.endswith(".jsonl"): raise ValueError(f"File `{path}` is not a jsonl file.") items = [] with jsonlines.open(path) as reader: for item in reader: items += [item] return items def write_jsonl(path: str, data: List[dict], append: bool = False): with jsonlines.open(path, mode='a' if append else 'w') as writer: for item in data: writer.write(item) def read_jsonl_gz(path: str) -> List[dict]: if not path.endswith(".jsonl.gz"): raise ValueError(f"File `{path}` is not a jsonl.gz file.") with gzip.open(path, "rt") as f: data = [json.loads(line) for line in f] return data # generator that returns the item and the index in the dataset. # if the results_path exists, it will skip all items that have been processed # before. def enumerate_resume(dataset, results_path): if not os.path.exists(results_path): for i, item in enumerate(dataset): yield i, item else: count = 0 with jsonlines.open(results_path) as reader: for item in reader: count += 1 for i, item in enumerate(dataset): # skip items that have been processed before if i < count: continue yield i, item def resume_success_count(dataset) -> int: count = 0 for item in dataset: if "is_solved" in item and item["is_solved"]: count += 1 return count