ArthurChen189's picture
upload pyserini
62977bb
raw
history blame
12.3 kB
#
# Pyserini: Reproducible IR research with sparse and dense representations
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import itertools
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from enum import Enum
from typing import List, Set, Tuple
class AggregationMethod(Enum):
SUM = 'sum'
class RescoreMethod(Enum):
RRF = 'rrf'
SCALE = 'scale'
NORMALIZE = 'normalize'
class Qrels:
"""Wrapper class for TREC Qrels.
Parameters
----------
filepath : str
File path of a given TREC Qrels.
"""
columns = ['topic', 'q0', 'docid', 'relevance_grade']
def __init__(self, filepath: str = None):
self.filepath = filepath
self.qrels_data = pd.DataFrame(columns=Qrels.columns)
if filepath is not None:
self.read_run(self.filepath)
def read_run(self, filepath: str):
self.qrels_data = pd.read_csv(filepath, sep='\s+', names=Qrels.columns)
def get_relevance_grades(self) -> Set[str]:
"""Return a set with all relevance grades."""
return set(sorted(self.qrels_data["relevance_grade"].unique()))
def topics(self) -> Set[str]:
"""Return a set with all topics."""
return set(sorted(self.qrels_data["topic"].unique()))
def get_docids(self, topic, relevance_grades=None) -> List[str]:
""""Return a list of docids for a given topic and a list relevance grades.
Parameters:
----------
relevance : List[int]
E.g. [0, 1, 2]. If not provided, then all relevance will be returned.
topic : int
"""
if relevance_grades is None:
relevance_grades = self.get_relevance_grades()
filtered_df = self.qrels_data[self.qrels_data['topic'] == topic]
filtered_df = filtered_df[filtered_df['relevance_grade'].isin(relevance_grades)]
return filtered_df['docid'].tolist()
class TrecRun:
"""Wrapper class for a TREC run.
Parameters
----------
filepath : str
File path of a given TREC Run.
"""
columns = ['topic', 'q0', 'docid', 'rank', 'score', 'tag']
def __init__(self, filepath: str = None, resort: bool = False):
self.reset_data()
self.filepath = filepath
self.resort = resort
if filepath is not None:
self.read_run(self.filepath,self.resort)
def reset_data(self):
self.run_data = pd.DataFrame(columns=TrecRun.columns)
def read_run(self, filepath: str, resort: bool = False) -> None:
self.run_data = pd.read_csv(filepath, sep='\s+', names=TrecRun.columns, dtype={'docid': 'str'})
if resort:
self.run_data.sort_values(["topic", "score"], inplace=True, ascending=[True, False])
self.run_data["rank"] = self.run_data.groupby("topic")["score"].rank(ascending=False,method='first')
def topics(self) -> Set[str]:
"""Return a set with all topics."""
return set(sorted(self.run_data["topic"].unique()))
def clone(self):
"""Return a deep copy of the current instance."""
return deepcopy(self)
def save_to_txt(self, output_path: str, tag: str = None) -> None:
if len(self.run_data) == 0:
raise Exception('Nothing to save. TrecRun is empty')
if tag is not None:
self.run_data['tag'] = tag
self.run_data = self.run_data.sort_values(by=['topic', 'score'], ascending=[True, False])
self.run_data.to_csv(output_path, sep=' ', header=False, index=False)
def get_docs_by_topic(self, topic: str, max_docs: int = None):
docs = self.run_data[self.run_data['topic'] == topic]
if max_docs is not None:
docs = docs.head(max_docs)
return docs
def rescore(self, method: RescoreMethod, rrf_k: int = None, scale: float = None):
# Refer to this guide on how to efficiently manipulate dataframes: https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6
if method == RescoreMethod.RRF:
assert rrf_k is not None, 'Parameter "rrf_k" must be a valid integer.'
self.run_data['score'] = 1 / (rrf_k + self.run_data['rank'].values)
elif method == RescoreMethod.SCALE:
assert scale is not None, 'Parameter "scale" must not be none.'
self.run_data['score'] = self.run_data['score'].values * scale
elif method == RescoreMethod.NORMALIZE:
for topic in self.topics():
scores = self.run_data[self.run_data['topic'] == topic]['score'].copy().values
low = np.min(scores)
high = np.max(scores)
if high - low == 0:
self.run_data.loc[self.run_data['topic'] == topic, 'score'] = 1
else:
scores = (scores - low) / (high - low)
scores = [float(score) for score in scores]
self.run_data.loc[self.run_data['topic'] == topic, 'score'] = scores
else:
raise NotImplementedError()
return self
def to_numpy(self) -> np.ndarray:
return self.run_data.to_numpy(copy=True)
def discard_qrels(self, qrels: Qrels, clone=True):
"""Discard each docid in self if docid is also in the given qrels.
This operation is performed on each topic separately.
Parameters:
----------
qrels : Qrels
Qrels with docids to remove from TrecRun.
clone : Bool
Return a new TrecRun object if True, else self will be modified and returned.
"""
return self._filter_from_qrels(qrels, False, clone=clone)
def retain_qrels(self, qrels: Qrels, clone=True):
"""Retain each docid in self if docid is also in the given qrels.
This operation is performed on each topic separately.
After this operation, judged@x based on the given qrels should be 1.
Parameters:
----------
qrels : Qrels
Qrels with docids to keep in TrecRun.
clone : Bool
Return a new TrecRun object if True, else self will be modified and returned.
"""
return self._filter_from_qrels(qrels, True, clone=clone)
def _filter_from_qrels(self, qrels: Qrels, keep: bool, clone=True):
"""Private helper function to remove/keep each docid in self if docid is also in the given Qrels object.
This operation is performed on each topic separately.
Parameters:
----------
qrels : Qrels
Qrels with docids to remove from or keep in TrecRun.
clone : Bool
Return a new TrecRun object if True, else self will be modified and returned.
"""
df_list = []
for topic in self.topics():
if topic not in qrels.topics():
continue
qrels_docids = qrels.get_docids(topic)
topic_df = self.run_data[self.run_data['topic'] == topic]
if keep is True:
topic_df = topic_df[topic_df['docid'].isin(qrels_docids)]
else:
topic_df = topic_df[~topic_df['docid'].isin(qrels_docids)]
df_list.append(topic_df)
run = TrecRun() if clone is True else self
return TrecRun.from_dataframes(df_list, run)
@staticmethod
def get_all_topics_from_runs(runs) -> Set[str]:
all_topics = set()
for run in runs:
all_topics = all_topics.union(run.topics())
return all_topics
@staticmethod
def merge(runs, aggregation: AggregationMethod, depth: int = None, k: int = None):
"""Return a TrecRun by aggregating docid in various ways such as summing scores
Parameters
----------
runs : List[TrecRun]
List of ``TrecRun`` objects.
aggregation : AggregationMethod
The aggregation method to use.
depth : int
Maximum number of results from each input run to consider. Set to ``None`` by default, which indicates that
the complete list of results is considered.
k : int
Length of final results list. Set to ``None`` by default, which indicates that the union of all input documents
are ranked.
"""
if len(runs) < 2:
raise Exception('Merge requires at least 2 runs.')
rows = []
if aggregation == AggregationMethod.SUM:
topics = list(TrecRun.get_all_topics_from_runs(runs))
def merge_topic(topic):
doc_scores = dict()
for run in runs:
for docid, score in run.get_docs_by_topic(topic, depth)[['docid', 'score']].values:
doc_scores[docid] = doc_scores.get(docid, 0.0) + score
sorted_doc_scores = sorted(iter(doc_scores.items()), key=lambda x: (-x[1], x[0]))
sorted_doc_scores = sorted_doc_scores if k is None else sorted_doc_scores[:k]
return [
(topic, 'Q0', docid, rank, score, 'merge_sum')
for rank, (docid, score) in enumerate(sorted_doc_scores, start=1)
]
max_workers = max(len(topics)/10, 1)
with ThreadPoolExecutor(max_workers=int(max_workers)) as exec:
results = list(exec.map(merge_topic, topics))
rows = list(itertools.chain.from_iterable(results))
else:
raise NotImplementedError()
return TrecRun.from_list(rows)
@staticmethod
def from_dataframes(dfs, run=None):
"""Return a TrecRun by populating dataframe with the provided list of dataframes.
Parameters
----------
dfs: List[Dataframe]
A list of Dataframes conforming to TrecRun.columns
run: TrecRun
Set to ``None`` by default. If None, then a new instance of TrecRun will be created.
Else, the given TrecRun will be modified.
"""
res = TrecRun() if run is None else run
res.reset_data()
res.run_data = pd.concat([df for df in dfs])
return res
@staticmethod
def from_list(rows, run=None):
"""Return a TrecRun by populating dataframe with the provided list of tuples.
For performance reasons, df.to_numpy() is faster than df.iterrows().
When manipulating dataframes, we first dump to np.ndarray and construct a list of tuples with new values.
Then use this function to convert the list of tuples to a TrecRun object.
Parameters
----------
rows: List[tuples]
List of tuples in the following format: (topic, 'Q0', docid, rank, score, tag)
run: TrecRun
Set to ``None`` by default. If None, then a new instance of TrecRun will be created.
Else, the given TrecRun will be modified.
"""
res = TrecRun() if run is None else run
df = pd.DataFrame(rows)
df.columns = TrecRun.columns
res.run_data = df.copy()
return res
@staticmethod
def from_search_results(docid_score_pair: Tuple[str, float], topic=1):
rows = []
for rank, (docid, score) in enumerate(docid_score_pair, start=1):
rows.append((topic, 'Q0', docid, rank, score, 'searcher'))
return TrecRun.from_list(rows)
@staticmethod
def concat(runs):
"""Return a new TrecRun by concatenating a list of TrecRuns
Parameters
----------
runs : List[TrecRun]
List of ``TrecRun`` objects.
"""
run = TrecRun()
run.run_data = pd.concat([run.run_data for run in runs])
return run