File size: 12,283 Bytes
62977bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
#
# Pyserini: Reproducible IR research with sparse and dense representations
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import itertools
import numpy as np
import pandas as pd

from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from enum import Enum
from typing import List, Set, Tuple


class AggregationMethod(Enum):
    SUM = 'sum'


class RescoreMethod(Enum):
    RRF = 'rrf'
    SCALE = 'scale'
    NORMALIZE = 'normalize'


class Qrels:
    """Wrapper class for TREC Qrels.

    Parameters
    ----------
    filepath : str
        File path of a given TREC Qrels.
    """

    columns = ['topic', 'q0', 'docid', 'relevance_grade']

    def __init__(self, filepath: str = None):
        self.filepath = filepath
        self.qrels_data = pd.DataFrame(columns=Qrels.columns)

        if filepath is not None:
            self.read_run(self.filepath)

    def read_run(self, filepath: str):
        self.qrels_data = pd.read_csv(filepath, sep='\s+', names=Qrels.columns)

    def get_relevance_grades(self) -> Set[str]:
        """Return a set with all relevance grades."""

        return set(sorted(self.qrels_data["relevance_grade"].unique()))

    def topics(self) -> Set[str]:
        """Return a set with all topics."""

        return set(sorted(self.qrels_data["topic"].unique()))

    def get_docids(self, topic, relevance_grades=None) -> List[str]:
        """"Return a list of docids for a given topic and a list relevance grades.

        Parameters:
        ----------
        relevance : List[int]
            E.g. [0, 1, 2]. If not provided, then all relevance will be returned.
        topic : int
        """

        if relevance_grades is None:
            relevance_grades = self.get_relevance_grades()

        filtered_df = self.qrels_data[self.qrels_data['topic'] == topic]
        filtered_df = filtered_df[filtered_df['relevance_grade'].isin(relevance_grades)]

        return filtered_df['docid'].tolist()


class TrecRun:
    """Wrapper class for a TREC run.

    Parameters
    ----------
    filepath : str
        File path of a given TREC Run.
    """

    columns = ['topic', 'q0', 'docid', 'rank', 'score', 'tag']

    def __init__(self, filepath: str = None, resort: bool = False):
        self.reset_data()
        self.filepath = filepath
        self.resort = resort

        if filepath is not None:
            self.read_run(self.filepath,self.resort)

    def reset_data(self):
        self.run_data = pd.DataFrame(columns=TrecRun.columns)

    def read_run(self, filepath: str, resort: bool = False) -> None:
        self.run_data = pd.read_csv(filepath, sep='\s+', names=TrecRun.columns, dtype={'docid': 'str'})
        if resort:
            self.run_data.sort_values(["topic", "score"], inplace=True, ascending=[True, False])
            self.run_data["rank"] = self.run_data.groupby("topic")["score"].rank(ascending=False,method='first')

    def topics(self) -> Set[str]:
        """Return a set with all topics."""
        return set(sorted(self.run_data["topic"].unique()))

    def clone(self):
        """Return a deep copy of the current instance."""
        return deepcopy(self)

    def save_to_txt(self, output_path: str, tag: str = None) -> None:
        if len(self.run_data) == 0:
            raise Exception('Nothing to save. TrecRun is empty')

        if tag is not None:
            self.run_data['tag'] = tag

        self.run_data = self.run_data.sort_values(by=['topic', 'score'], ascending=[True, False])
        self.run_data.to_csv(output_path, sep=' ', header=False, index=False)

    def get_docs_by_topic(self, topic: str, max_docs: int = None):
        docs = self.run_data[self.run_data['topic'] == topic]

        if max_docs is not None:
            docs = docs.head(max_docs)

        return docs

    def rescore(self, method: RescoreMethod, rrf_k: int = None, scale: float = None):
        # Refer to this guide on how to efficiently manipulate dataframes: https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6
        if method == RescoreMethod.RRF:
            assert rrf_k is not None, 'Parameter "rrf_k" must be a valid integer.'
            self.run_data['score'] = 1 / (rrf_k + self.run_data['rank'].values)
        elif method == RescoreMethod.SCALE:
            assert scale is not None, 'Parameter "scale" must not be none.'
            self.run_data['score'] = self.run_data['score'].values * scale
        elif method == RescoreMethod.NORMALIZE:
            for topic in self.topics():
                scores = self.run_data[self.run_data['topic'] == topic]['score'].copy().values
                low = np.min(scores)
                high = np.max(scores)

                if high - low == 0:
                    self.run_data.loc[self.run_data['topic'] == topic, 'score'] = 1
                else:
                    scores = (scores - low) / (high - low)
                    scores = [float(score) for score in scores]
                    self.run_data.loc[self.run_data['topic'] == topic, 'score'] = scores
        else:
            raise NotImplementedError()

        return self

    def to_numpy(self) -> np.ndarray:
        return self.run_data.to_numpy(copy=True)

    def discard_qrels(self, qrels: Qrels, clone=True):
        """Discard each docid in self if docid is also in the given qrels.
        This operation is performed on each topic separately.

        Parameters:
        ----------
        qrels : Qrels
            Qrels with docids to remove from TrecRun.
        clone : Bool
            Return a new TrecRun object if True, else self will be modified and returned.
        """

        return self._filter_from_qrels(qrels, False, clone=clone)

    def retain_qrels(self, qrels: Qrels, clone=True):
        """Retain each docid in self if docid is also in the given qrels.
        This operation is performed on each topic separately.
        After this operation, judged@x based on the given qrels should be 1.

        Parameters:
        ----------
        qrels : Qrels
            Qrels with docids to keep in TrecRun.
        clone : Bool
            Return a new TrecRun object if True, else self will be modified and returned.
        """

        return self._filter_from_qrels(qrels, True, clone=clone)

    def _filter_from_qrels(self, qrels: Qrels, keep: bool, clone=True):
        """Private helper function to remove/keep each docid in self if docid is also in the given Qrels object.
        This operation is performed on each topic separately.

        Parameters:
        ----------
        qrels : Qrels
            Qrels with docids to remove from or keep in TrecRun.
        clone : Bool
            Return a new TrecRun object if True, else self will be modified and returned.
        """

        df_list = []
        for topic in self.topics():
            if topic not in qrels.topics():
                continue

            qrels_docids = qrels.get_docids(topic)
            topic_df = self.run_data[self.run_data['topic'] == topic]
            if keep is True:
                topic_df = topic_df[topic_df['docid'].isin(qrels_docids)]
            else:
                topic_df = topic_df[~topic_df['docid'].isin(qrels_docids)]
            df_list.append(topic_df)

        run = TrecRun() if clone is True else self
        return TrecRun.from_dataframes(df_list, run)

    @staticmethod
    def get_all_topics_from_runs(runs) -> Set[str]:
        all_topics = set()
        for run in runs:
            all_topics = all_topics.union(run.topics())

        return all_topics

    @staticmethod
    def merge(runs, aggregation: AggregationMethod, depth: int = None, k: int = None):
        """Return a TrecRun by aggregating docid in various ways such as summing scores

        Parameters
        ----------
        runs : List[TrecRun]
            List of ``TrecRun`` objects.
        aggregation : AggregationMethod
            The aggregation method to use.
        depth : int
            Maximum number of results from each input run to consider. Set to ``None`` by default, which indicates that
            the complete list of results is considered.
        k : int
            Length of final results list.  Set to ``None`` by default, which indicates that the union of all input documents
            are ranked.
        """

        if len(runs) < 2:
            raise Exception('Merge requires at least 2 runs.')

        rows = []

        if aggregation == AggregationMethod.SUM:
            topics = list(TrecRun.get_all_topics_from_runs(runs))

            def merge_topic(topic):
                doc_scores = dict()

                for run in runs:
                    for docid, score in run.get_docs_by_topic(topic, depth)[['docid', 'score']].values:
                        doc_scores[docid] = doc_scores.get(docid, 0.0) + score

                sorted_doc_scores = sorted(iter(doc_scores.items()), key=lambda x: (-x[1], x[0]))
                sorted_doc_scores = sorted_doc_scores if k is None else sorted_doc_scores[:k]

                return [
                    (topic, 'Q0', docid, rank, score, 'merge_sum')
                    for rank, (docid, score) in enumerate(sorted_doc_scores, start=1)
                ]

            max_workers = max(len(topics)/10, 1)
            with ThreadPoolExecutor(max_workers=int(max_workers)) as exec:
                results = list(exec.map(merge_topic, topics))

            rows = list(itertools.chain.from_iterable(results))
        else:
            raise NotImplementedError()

        return TrecRun.from_list(rows)

    @staticmethod
    def from_dataframes(dfs, run=None):
        """Return a TrecRun by populating dataframe with the provided list of dataframes.

        Parameters
        ----------
        dfs: List[Dataframe]
            A list of Dataframes conforming to TrecRun.columns

        run: TrecRun
            Set to ``None`` by default. If None, then a new instance of TrecRun will be created.
            Else, the given TrecRun will be modified.
        """

        res = TrecRun() if run is None else run
        res.reset_data()
        res.run_data = pd.concat([df for df in dfs])

        return res

    @staticmethod
    def from_list(rows, run=None):
        """Return a TrecRun by populating dataframe with the provided list of tuples.
        For performance reasons, df.to_numpy() is faster than df.iterrows().
        When manipulating dataframes, we first dump to np.ndarray and construct a list of tuples with new values.
        Then use this function to convert the list of tuples to a TrecRun object.

        Parameters
        ----------
        rows: List[tuples]
            List of tuples in the following format: (topic, 'Q0', docid, rank, score, tag)

        run: TrecRun
            Set to ``None`` by default. If None, then a new instance of TrecRun will be created.
            Else, the given TrecRun will be modified.
        """

        res = TrecRun() if run is None else run

        df = pd.DataFrame(rows)
        df.columns = TrecRun.columns
        res.run_data = df.copy()

        return res

    @staticmethod
    def from_search_results(docid_score_pair: Tuple[str, float], topic=1):
        rows = []

        for rank, (docid, score) in enumerate(docid_score_pair, start=1):
            rows.append((topic, 'Q0', docid, rank, score, 'searcher'))

        return TrecRun.from_list(rows)

    @staticmethod
    def concat(runs):
        """Return a new TrecRun by concatenating a list of TrecRuns

        Parameters
        ----------
        runs : List[TrecRun]
            List of ``TrecRun`` objects.
        """

        run = TrecRun()
        run.run_data = pd.concat([run.run_data for run in runs])
        return run