|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import itertools |
|
import numpy as np |
|
import pandas as pd |
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
from copy import deepcopy |
|
from enum import Enum |
|
from typing import List, Set, Tuple |
|
|
|
|
|
class AggregationMethod(Enum): |
|
SUM = 'sum' |
|
|
|
|
|
class RescoreMethod(Enum): |
|
RRF = 'rrf' |
|
SCALE = 'scale' |
|
NORMALIZE = 'normalize' |
|
|
|
|
|
class Qrels: |
|
"""Wrapper class for TREC Qrels. |
|
|
|
Parameters |
|
---------- |
|
filepath : str |
|
File path of a given TREC Qrels. |
|
""" |
|
|
|
columns = ['topic', 'q0', 'docid', 'relevance_grade'] |
|
|
|
def __init__(self, filepath: str = None): |
|
self.filepath = filepath |
|
self.qrels_data = pd.DataFrame(columns=Qrels.columns) |
|
|
|
if filepath is not None: |
|
self.read_run(self.filepath) |
|
|
|
def read_run(self, filepath: str): |
|
self.qrels_data = pd.read_csv(filepath, sep='\s+', names=Qrels.columns) |
|
|
|
def get_relevance_grades(self) -> Set[str]: |
|
"""Return a set with all relevance grades.""" |
|
|
|
return set(sorted(self.qrels_data["relevance_grade"].unique())) |
|
|
|
def topics(self) -> Set[str]: |
|
"""Return a set with all topics.""" |
|
|
|
return set(sorted(self.qrels_data["topic"].unique())) |
|
|
|
def get_docids(self, topic, relevance_grades=None) -> List[str]: |
|
""""Return a list of docids for a given topic and a list relevance grades. |
|
|
|
Parameters: |
|
---------- |
|
relevance : List[int] |
|
E.g. [0, 1, 2]. If not provided, then all relevance will be returned. |
|
topic : int |
|
""" |
|
|
|
if relevance_grades is None: |
|
relevance_grades = self.get_relevance_grades() |
|
|
|
filtered_df = self.qrels_data[self.qrels_data['topic'] == topic] |
|
filtered_df = filtered_df[filtered_df['relevance_grade'].isin(relevance_grades)] |
|
|
|
return filtered_df['docid'].tolist() |
|
|
|
|
|
class TrecRun: |
|
"""Wrapper class for a TREC run. |
|
|
|
Parameters |
|
---------- |
|
filepath : str |
|
File path of a given TREC Run. |
|
""" |
|
|
|
columns = ['topic', 'q0', 'docid', 'rank', 'score', 'tag'] |
|
|
|
def __init__(self, filepath: str = None, resort: bool = False): |
|
self.reset_data() |
|
self.filepath = filepath |
|
self.resort = resort |
|
|
|
if filepath is not None: |
|
self.read_run(self.filepath,self.resort) |
|
|
|
def reset_data(self): |
|
self.run_data = pd.DataFrame(columns=TrecRun.columns) |
|
|
|
def read_run(self, filepath: str, resort: bool = False) -> None: |
|
self.run_data = pd.read_csv(filepath, sep='\s+', names=TrecRun.columns, dtype={'docid': 'str'}) |
|
if resort: |
|
self.run_data.sort_values(["topic", "score"], inplace=True, ascending=[True, False]) |
|
self.run_data["rank"] = self.run_data.groupby("topic")["score"].rank(ascending=False,method='first') |
|
|
|
def topics(self) -> Set[str]: |
|
"""Return a set with all topics.""" |
|
return set(sorted(self.run_data["topic"].unique())) |
|
|
|
def clone(self): |
|
"""Return a deep copy of the current instance.""" |
|
return deepcopy(self) |
|
|
|
def save_to_txt(self, output_path: str, tag: str = None) -> None: |
|
if len(self.run_data) == 0: |
|
raise Exception('Nothing to save. TrecRun is empty') |
|
|
|
if tag is not None: |
|
self.run_data['tag'] = tag |
|
|
|
self.run_data = self.run_data.sort_values(by=['topic', 'score'], ascending=[True, False]) |
|
self.run_data.to_csv(output_path, sep=' ', header=False, index=False) |
|
|
|
def get_docs_by_topic(self, topic: str, max_docs: int = None): |
|
docs = self.run_data[self.run_data['topic'] == topic] |
|
|
|
if max_docs is not None: |
|
docs = docs.head(max_docs) |
|
|
|
return docs |
|
|
|
def rescore(self, method: RescoreMethod, rrf_k: int = None, scale: float = None): |
|
|
|
if method == RescoreMethod.RRF: |
|
assert rrf_k is not None, 'Parameter "rrf_k" must be a valid integer.' |
|
self.run_data['score'] = 1 / (rrf_k + self.run_data['rank'].values) |
|
elif method == RescoreMethod.SCALE: |
|
assert scale is not None, 'Parameter "scale" must not be none.' |
|
self.run_data['score'] = self.run_data['score'].values * scale |
|
elif method == RescoreMethod.NORMALIZE: |
|
for topic in self.topics(): |
|
scores = self.run_data[self.run_data['topic'] == topic]['score'].copy().values |
|
low = np.min(scores) |
|
high = np.max(scores) |
|
|
|
if high - low == 0: |
|
self.run_data.loc[self.run_data['topic'] == topic, 'score'] = 1 |
|
else: |
|
scores = (scores - low) / (high - low) |
|
scores = [float(score) for score in scores] |
|
self.run_data.loc[self.run_data['topic'] == topic, 'score'] = scores |
|
else: |
|
raise NotImplementedError() |
|
|
|
return self |
|
|
|
def to_numpy(self) -> np.ndarray: |
|
return self.run_data.to_numpy(copy=True) |
|
|
|
def discard_qrels(self, qrels: Qrels, clone=True): |
|
"""Discard each docid in self if docid is also in the given qrels. |
|
This operation is performed on each topic separately. |
|
|
|
Parameters: |
|
---------- |
|
qrels : Qrels |
|
Qrels with docids to remove from TrecRun. |
|
clone : Bool |
|
Return a new TrecRun object if True, else self will be modified and returned. |
|
""" |
|
|
|
return self._filter_from_qrels(qrels, False, clone=clone) |
|
|
|
def retain_qrels(self, qrels: Qrels, clone=True): |
|
"""Retain each docid in self if docid is also in the given qrels. |
|
This operation is performed on each topic separately. |
|
After this operation, judged@x based on the given qrels should be 1. |
|
|
|
Parameters: |
|
---------- |
|
qrels : Qrels |
|
Qrels with docids to keep in TrecRun. |
|
clone : Bool |
|
Return a new TrecRun object if True, else self will be modified and returned. |
|
""" |
|
|
|
return self._filter_from_qrels(qrels, True, clone=clone) |
|
|
|
def _filter_from_qrels(self, qrels: Qrels, keep: bool, clone=True): |
|
"""Private helper function to remove/keep each docid in self if docid is also in the given Qrels object. |
|
This operation is performed on each topic separately. |
|
|
|
Parameters: |
|
---------- |
|
qrels : Qrels |
|
Qrels with docids to remove from or keep in TrecRun. |
|
clone : Bool |
|
Return a new TrecRun object if True, else self will be modified and returned. |
|
""" |
|
|
|
df_list = [] |
|
for topic in self.topics(): |
|
if topic not in qrels.topics(): |
|
continue |
|
|
|
qrels_docids = qrels.get_docids(topic) |
|
topic_df = self.run_data[self.run_data['topic'] == topic] |
|
if keep is True: |
|
topic_df = topic_df[topic_df['docid'].isin(qrels_docids)] |
|
else: |
|
topic_df = topic_df[~topic_df['docid'].isin(qrels_docids)] |
|
df_list.append(topic_df) |
|
|
|
run = TrecRun() if clone is True else self |
|
return TrecRun.from_dataframes(df_list, run) |
|
|
|
@staticmethod |
|
def get_all_topics_from_runs(runs) -> Set[str]: |
|
all_topics = set() |
|
for run in runs: |
|
all_topics = all_topics.union(run.topics()) |
|
|
|
return all_topics |
|
|
|
@staticmethod |
|
def merge(runs, aggregation: AggregationMethod, depth: int = None, k: int = None): |
|
"""Return a TrecRun by aggregating docid in various ways such as summing scores |
|
|
|
Parameters |
|
---------- |
|
runs : List[TrecRun] |
|
List of ``TrecRun`` objects. |
|
aggregation : AggregationMethod |
|
The aggregation method to use. |
|
depth : int |
|
Maximum number of results from each input run to consider. Set to ``None`` by default, which indicates that |
|
the complete list of results is considered. |
|
k : int |
|
Length of final results list. Set to ``None`` by default, which indicates that the union of all input documents |
|
are ranked. |
|
""" |
|
|
|
if len(runs) < 2: |
|
raise Exception('Merge requires at least 2 runs.') |
|
|
|
rows = [] |
|
|
|
if aggregation == AggregationMethod.SUM: |
|
topics = list(TrecRun.get_all_topics_from_runs(runs)) |
|
|
|
def merge_topic(topic): |
|
doc_scores = dict() |
|
|
|
for run in runs: |
|
for docid, score in run.get_docs_by_topic(topic, depth)[['docid', 'score']].values: |
|
doc_scores[docid] = doc_scores.get(docid, 0.0) + score |
|
|
|
sorted_doc_scores = sorted(iter(doc_scores.items()), key=lambda x: (-x[1], x[0])) |
|
sorted_doc_scores = sorted_doc_scores if k is None else sorted_doc_scores[:k] |
|
|
|
return [ |
|
(topic, 'Q0', docid, rank, score, 'merge_sum') |
|
for rank, (docid, score) in enumerate(sorted_doc_scores, start=1) |
|
] |
|
|
|
max_workers = max(len(topics)/10, 1) |
|
with ThreadPoolExecutor(max_workers=int(max_workers)) as exec: |
|
results = list(exec.map(merge_topic, topics)) |
|
|
|
rows = list(itertools.chain.from_iterable(results)) |
|
else: |
|
raise NotImplementedError() |
|
|
|
return TrecRun.from_list(rows) |
|
|
|
@staticmethod |
|
def from_dataframes(dfs, run=None): |
|
"""Return a TrecRun by populating dataframe with the provided list of dataframes. |
|
|
|
Parameters |
|
---------- |
|
dfs: List[Dataframe] |
|
A list of Dataframes conforming to TrecRun.columns |
|
|
|
run: TrecRun |
|
Set to ``None`` by default. If None, then a new instance of TrecRun will be created. |
|
Else, the given TrecRun will be modified. |
|
""" |
|
|
|
res = TrecRun() if run is None else run |
|
res.reset_data() |
|
res.run_data = pd.concat([df for df in dfs]) |
|
|
|
return res |
|
|
|
@staticmethod |
|
def from_list(rows, run=None): |
|
"""Return a TrecRun by populating dataframe with the provided list of tuples. |
|
For performance reasons, df.to_numpy() is faster than df.iterrows(). |
|
When manipulating dataframes, we first dump to np.ndarray and construct a list of tuples with new values. |
|
Then use this function to convert the list of tuples to a TrecRun object. |
|
|
|
Parameters |
|
---------- |
|
rows: List[tuples] |
|
List of tuples in the following format: (topic, 'Q0', docid, rank, score, tag) |
|
|
|
run: TrecRun |
|
Set to ``None`` by default. If None, then a new instance of TrecRun will be created. |
|
Else, the given TrecRun will be modified. |
|
""" |
|
|
|
res = TrecRun() if run is None else run |
|
|
|
df = pd.DataFrame(rows) |
|
df.columns = TrecRun.columns |
|
res.run_data = df.copy() |
|
|
|
return res |
|
|
|
@staticmethod |
|
def from_search_results(docid_score_pair: Tuple[str, float], topic=1): |
|
rows = [] |
|
|
|
for rank, (docid, score) in enumerate(docid_score_pair, start=1): |
|
rows.append((topic, 'Q0', docid, rank, score, 'searcher')) |
|
|
|
return TrecRun.from_list(rows) |
|
|
|
@staticmethod |
|
def concat(runs): |
|
"""Return a new TrecRun by concatenating a list of TrecRuns |
|
|
|
Parameters |
|
---------- |
|
runs : List[TrecRun] |
|
List of ``TrecRun`` objects. |
|
""" |
|
|
|
run = TrecRun() |
|
run.run_data = pd.concat([run.run_data for run in runs]) |
|
return run |
|
|