File size: 12,283 Bytes
62977bb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
#
# Pyserini: Reproducible IR research with sparse and dense representations
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import itertools
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from enum import Enum
from typing import List, Set, Tuple
class AggregationMethod(Enum):
SUM = 'sum'
class RescoreMethod(Enum):
RRF = 'rrf'
SCALE = 'scale'
NORMALIZE = 'normalize'
class Qrels:
"""Wrapper class for TREC Qrels.
Parameters
----------
filepath : str
File path of a given TREC Qrels.
"""
columns = ['topic', 'q0', 'docid', 'relevance_grade']
def __init__(self, filepath: str = None):
self.filepath = filepath
self.qrels_data = pd.DataFrame(columns=Qrels.columns)
if filepath is not None:
self.read_run(self.filepath)
def read_run(self, filepath: str):
self.qrels_data = pd.read_csv(filepath, sep='\s+', names=Qrels.columns)
def get_relevance_grades(self) -> Set[str]:
"""Return a set with all relevance grades."""
return set(sorted(self.qrels_data["relevance_grade"].unique()))
def topics(self) -> Set[str]:
"""Return a set with all topics."""
return set(sorted(self.qrels_data["topic"].unique()))
def get_docids(self, topic, relevance_grades=None) -> List[str]:
""""Return a list of docids for a given topic and a list relevance grades.
Parameters:
----------
relevance : List[int]
E.g. [0, 1, 2]. If not provided, then all relevance will be returned.
topic : int
"""
if relevance_grades is None:
relevance_grades = self.get_relevance_grades()
filtered_df = self.qrels_data[self.qrels_data['topic'] == topic]
filtered_df = filtered_df[filtered_df['relevance_grade'].isin(relevance_grades)]
return filtered_df['docid'].tolist()
class TrecRun:
"""Wrapper class for a TREC run.
Parameters
----------
filepath : str
File path of a given TREC Run.
"""
columns = ['topic', 'q0', 'docid', 'rank', 'score', 'tag']
def __init__(self, filepath: str = None, resort: bool = False):
self.reset_data()
self.filepath = filepath
self.resort = resort
if filepath is not None:
self.read_run(self.filepath,self.resort)
def reset_data(self):
self.run_data = pd.DataFrame(columns=TrecRun.columns)
def read_run(self, filepath: str, resort: bool = False) -> None:
self.run_data = pd.read_csv(filepath, sep='\s+', names=TrecRun.columns, dtype={'docid': 'str'})
if resort:
self.run_data.sort_values(["topic", "score"], inplace=True, ascending=[True, False])
self.run_data["rank"] = self.run_data.groupby("topic")["score"].rank(ascending=False,method='first')
def topics(self) -> Set[str]:
"""Return a set with all topics."""
return set(sorted(self.run_data["topic"].unique()))
def clone(self):
"""Return a deep copy of the current instance."""
return deepcopy(self)
def save_to_txt(self, output_path: str, tag: str = None) -> None:
if len(self.run_data) == 0:
raise Exception('Nothing to save. TrecRun is empty')
if tag is not None:
self.run_data['tag'] = tag
self.run_data = self.run_data.sort_values(by=['topic', 'score'], ascending=[True, False])
self.run_data.to_csv(output_path, sep=' ', header=False, index=False)
def get_docs_by_topic(self, topic: str, max_docs: int = None):
docs = self.run_data[self.run_data['topic'] == topic]
if max_docs is not None:
docs = docs.head(max_docs)
return docs
def rescore(self, method: RescoreMethod, rrf_k: int = None, scale: float = None):
# Refer to this guide on how to efficiently manipulate dataframes: https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6
if method == RescoreMethod.RRF:
assert rrf_k is not None, 'Parameter "rrf_k" must be a valid integer.'
self.run_data['score'] = 1 / (rrf_k + self.run_data['rank'].values)
elif method == RescoreMethod.SCALE:
assert scale is not None, 'Parameter "scale" must not be none.'
self.run_data['score'] = self.run_data['score'].values * scale
elif method == RescoreMethod.NORMALIZE:
for topic in self.topics():
scores = self.run_data[self.run_data['topic'] == topic]['score'].copy().values
low = np.min(scores)
high = np.max(scores)
if high - low == 0:
self.run_data.loc[self.run_data['topic'] == topic, 'score'] = 1
else:
scores = (scores - low) / (high - low)
scores = [float(score) for score in scores]
self.run_data.loc[self.run_data['topic'] == topic, 'score'] = scores
else:
raise NotImplementedError()
return self
def to_numpy(self) -> np.ndarray:
return self.run_data.to_numpy(copy=True)
def discard_qrels(self, qrels: Qrels, clone=True):
"""Discard each docid in self if docid is also in the given qrels.
This operation is performed on each topic separately.
Parameters:
----------
qrels : Qrels
Qrels with docids to remove from TrecRun.
clone : Bool
Return a new TrecRun object if True, else self will be modified and returned.
"""
return self._filter_from_qrels(qrels, False, clone=clone)
def retain_qrels(self, qrels: Qrels, clone=True):
"""Retain each docid in self if docid is also in the given qrels.
This operation is performed on each topic separately.
After this operation, judged@x based on the given qrels should be 1.
Parameters:
----------
qrels : Qrels
Qrels with docids to keep in TrecRun.
clone : Bool
Return a new TrecRun object if True, else self will be modified and returned.
"""
return self._filter_from_qrels(qrels, True, clone=clone)
def _filter_from_qrels(self, qrels: Qrels, keep: bool, clone=True):
"""Private helper function to remove/keep each docid in self if docid is also in the given Qrels object.
This operation is performed on each topic separately.
Parameters:
----------
qrels : Qrels
Qrels with docids to remove from or keep in TrecRun.
clone : Bool
Return a new TrecRun object if True, else self will be modified and returned.
"""
df_list = []
for topic in self.topics():
if topic not in qrels.topics():
continue
qrels_docids = qrels.get_docids(topic)
topic_df = self.run_data[self.run_data['topic'] == topic]
if keep is True:
topic_df = topic_df[topic_df['docid'].isin(qrels_docids)]
else:
topic_df = topic_df[~topic_df['docid'].isin(qrels_docids)]
df_list.append(topic_df)
run = TrecRun() if clone is True else self
return TrecRun.from_dataframes(df_list, run)
@staticmethod
def get_all_topics_from_runs(runs) -> Set[str]:
all_topics = set()
for run in runs:
all_topics = all_topics.union(run.topics())
return all_topics
@staticmethod
def merge(runs, aggregation: AggregationMethod, depth: int = None, k: int = None):
"""Return a TrecRun by aggregating docid in various ways such as summing scores
Parameters
----------
runs : List[TrecRun]
List of ``TrecRun`` objects.
aggregation : AggregationMethod
The aggregation method to use.
depth : int
Maximum number of results from each input run to consider. Set to ``None`` by default, which indicates that
the complete list of results is considered.
k : int
Length of final results list. Set to ``None`` by default, which indicates that the union of all input documents
are ranked.
"""
if len(runs) < 2:
raise Exception('Merge requires at least 2 runs.')
rows = []
if aggregation == AggregationMethod.SUM:
topics = list(TrecRun.get_all_topics_from_runs(runs))
def merge_topic(topic):
doc_scores = dict()
for run in runs:
for docid, score in run.get_docs_by_topic(topic, depth)[['docid', 'score']].values:
doc_scores[docid] = doc_scores.get(docid, 0.0) + score
sorted_doc_scores = sorted(iter(doc_scores.items()), key=lambda x: (-x[1], x[0]))
sorted_doc_scores = sorted_doc_scores if k is None else sorted_doc_scores[:k]
return [
(topic, 'Q0', docid, rank, score, 'merge_sum')
for rank, (docid, score) in enumerate(sorted_doc_scores, start=1)
]
max_workers = max(len(topics)/10, 1)
with ThreadPoolExecutor(max_workers=int(max_workers)) as exec:
results = list(exec.map(merge_topic, topics))
rows = list(itertools.chain.from_iterable(results))
else:
raise NotImplementedError()
return TrecRun.from_list(rows)
@staticmethod
def from_dataframes(dfs, run=None):
"""Return a TrecRun by populating dataframe with the provided list of dataframes.
Parameters
----------
dfs: List[Dataframe]
A list of Dataframes conforming to TrecRun.columns
run: TrecRun
Set to ``None`` by default. If None, then a new instance of TrecRun will be created.
Else, the given TrecRun will be modified.
"""
res = TrecRun() if run is None else run
res.reset_data()
res.run_data = pd.concat([df for df in dfs])
return res
@staticmethod
def from_list(rows, run=None):
"""Return a TrecRun by populating dataframe with the provided list of tuples.
For performance reasons, df.to_numpy() is faster than df.iterrows().
When manipulating dataframes, we first dump to np.ndarray and construct a list of tuples with new values.
Then use this function to convert the list of tuples to a TrecRun object.
Parameters
----------
rows: List[tuples]
List of tuples in the following format: (topic, 'Q0', docid, rank, score, tag)
run: TrecRun
Set to ``None`` by default. If None, then a new instance of TrecRun will be created.
Else, the given TrecRun will be modified.
"""
res = TrecRun() if run is None else run
df = pd.DataFrame(rows)
df.columns = TrecRun.columns
res.run_data = df.copy()
return res
@staticmethod
def from_search_results(docid_score_pair: Tuple[str, float], topic=1):
rows = []
for rank, (docid, score) in enumerate(docid_score_pair, start=1):
rows.append((topic, 'Q0', docid, rank, score, 'searcher'))
return TrecRun.from_list(rows)
@staticmethod
def concat(runs):
"""Return a new TrecRun by concatenating a list of TrecRuns
Parameters
----------
runs : List[TrecRun]
List of ``TrecRun`` objects.
"""
run = TrecRun()
run.run_data = pd.concat([run.run_data for run in runs])
return run
|