lhoestq HF staff commited on
Commit
905f549
1 Parent(s): c361455

add excluded tab

Browse files
Files changed (1) hide show
  1. app.py +128 -62
app.py CHANGED
@@ -1,15 +1,13 @@
1
  import ast
2
  import glob
3
- import tempfile
4
- from dataclasses import asdict
5
  from itertools import islice
6
  from functools import partial
7
- from typing import Optional
8
 
9
  import gradio as gr
10
  import nltk
11
  import pandas as pd
12
- from datatrove.utils.typeshelper import Languages
13
  from datatrove.executor.local import LocalPipelineExecutor
14
  from datatrove.pipeline.extractors import Trafilatura
15
  from datatrove.pipeline.filters.base_filter import BaseFilter
@@ -23,8 +21,7 @@ from datatrove.pipeline.filters import (
23
  )
24
  from datatrove.pipeline.formatters import PIIFormatter
25
  from datatrove.pipeline.readers import JsonlReader, WarcReader
26
- from datatrove.pipeline.writers.jsonl import JsonlWriter
27
- from difflib import Differ
28
 
29
 
30
  nltk.download('punkt_tab')
@@ -114,6 +111,9 @@ blocks = sorted(glob.glob("images/*.png"))
114
  def prepare_as_list_or_none(text: str) -> Optional[list[str]]:
115
  return ([x.strip() for x in text.split(",") if x.strip()] or None) if text else None
116
 
 
 
 
117
  def build_code_snippet(steps, params=None):
118
  # TODO
119
  return (
@@ -183,8 +183,8 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
183
  language_filtering_checkbox = gr.Checkbox(True, label="Enable")
184
  with gr.Accordion("Parameters", open=True) as acc:
185
  with gr.Row():
186
- languages_textbox = gr.Textbox("", label="languages", info="list of languages to keep. empty for all")
187
- languages_textbox.prepare_parameter = prepare_as_list_or_none
188
  language_threshold_slider = gr.Slider(0, 1, value=0.65, step=0.05, label="language_threshold", info="minimum score to accept a document")
189
  language_filtering_checkbox.change(lambda visible: gr.Accordion(visible=visible), inputs=language_filtering_checkbox, outputs=acc)
190
  language_filtering_parameters_components = [languages_textbox, language_threshold_slider]
@@ -196,7 +196,7 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
196
  with gr.Accordion("Parameters", open=True) as acc:
197
  with gr.Group():
198
  with gr.Row():
199
- language_dropdown1 = gr.Dropdown([v for k, v in vars(Languages).items() if not k.startswith("__")], value=Languages.english, label="language", info="tokenizer language")
200
  top_n_grams_textbox = gr.Textbox("(2, 0.2), (3, 0.18), (4, 0.16)", label="top_n_grams")
201
  top_n_grams_textbox.prepare_parameter = ast.literal_eval
202
  dup_n_grams_textbox = gr.Textbox("(5, 0.15), (6, 0.14), (7, 0.13), (8, 0.12), (9, 0.11), (10, 0.10)", label="dup_n_grams")
@@ -250,7 +250,7 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
250
  with gr.Row():
251
  split_paragraph_checkbox = gr.Checkbox(True, label="split_paragraph", info="disable to apply the filters to each sentence instead of to each line")
252
  with gr.Row():
253
- language_dropdown2 = gr.Dropdown([v for k, v in vars(Languages).items() if not k.startswith("__")], value=Languages.english, label="language", info="tokenizer language")
254
  min_num_sentences_slider = gr.Slider(0, 10, value=5, step=1, label="min_num_sentences", info="remove documents that do not have at least this number of sentences (after line filtering)")
255
  min_words_per_line_slider = gr.Slider(0, 10, value=3, step=1, label="min_words_per_line", info="drop lines without this min number of words")
256
  max_word_length_slider = gr.Slider(0, 2000, value=1000, step=10, label="max_word_length", info=" drop lines where at least one word has more than this number of characters")
@@ -271,7 +271,7 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
271
  with gr.Accordion("Parameters", open=True) as acc:
272
  with gr.Group():
273
  with gr.Row():
274
- language_dropdown2 = gr.Dropdown([v for k, v in vars(Languages).items() if not k.startswith("__")], value=Languages.english, label="language", info="tokenizer language")
275
  min_doc_words_slider = gr.Slider(0, 1000, value=50, step=10, label="min_doc_words")
276
  max_doc_words_slider = gr.Slider(0, 200_000, value=100_000, step=10_000, label="max_doc_words")
277
  with gr.Row():
@@ -289,7 +289,9 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
289
  gopher_filtering_quality_checkbox.change(lambda visible: gr.Accordion(visible=visible), inputs=gopher_filtering_quality_checkbox, outputs=acc)
290
  gopher_filtering_quality_parameters_components = [language_dropdown2, min_doc_words_slider, max_doc_words_slider, min_avg_word_length_slider, max_avg_word_length_slider, max_symbol_word_ratio_slider, max_bullet_lines_ratio_slider, max_ellipsis_lines_ratio_slider, max_non_alpha_words_ratio_slider, min_stop_words_slider, stop_words_textbox]
291
 
292
- view_pipeline_results_button = gr.Button("View Pipeline Results", variant="primary")
 
 
293
 
294
  steps = [
295
  URLFilter,
@@ -313,7 +315,15 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
313
  ]
314
 
315
  with gr.Tab("Output") as output_tab:
316
- output_dataframe_diff = gr.DataFrame(datatype="markdown")
 
 
 
 
 
 
 
 
317
  with gr.Tab("Python code") as code_tab:
318
  python_code_markdown = gr.Markdown(build_code_snippet(steps))
319
 
@@ -338,7 +348,7 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
338
  pii_removal_checkbox
339
  ] + sum(steps_parameters_components, [])
340
 
341
- @view_pipeline_results_button.click(inputs=inputs, outputs=[output_tab, output_dataframe_diff])
342
  def view_pipeline_results(*args):
343
  enable_steps, steps_parameters = args[:len(steps)], args[len(steps):]
344
  steps_parameters_iter = iter(steps_parameters)
@@ -357,53 +367,109 @@ with gr.Blocks(css=css, js=make_gallery_image_buttons_js) as demo:
357
  for step_parameters_components in steps_parameters_components
358
  ]
359
 
360
- with tempfile.TemporaryDirectory() as output_path:
361
- steps_to_run = [
362
- step(**step_parameters, **({"exclusion_writer": JsonlWriter(f"{output_path}/base_processing/removed/{step.__name__}/{DUMP_TO_PROCESS}")} if issubclass(step, BaseFilter) and False else {}))
363
- for step, step_parameters, enable_step in zip(steps, steps_parameters, enable_steps)
364
- if enable_step
365
- ]
366
- output_docs = []
367
- if steps_parameters[:2] == default_steps_parameters[:2] and all(enable_steps[:2]):
368
- num_warc_samples = 2000
369
- default_output_docs = default_output_docs_2k
370
- pipeline = LocalPipelineExecutor(
371
- pipeline=[
372
- JsonlReader(data_folder=f"output_text_extraction-2k/base_processing/output/{DUMP_TO_PROCESS}", glob_pattern="*.jsonl.gz")
373
- ] + steps_to_run[2:] + [
374
- lambda data, rank, world_size: map(output_docs.append, data)
375
- ],
376
- logging_dir="logs",
377
- skip_completed=False
378
- )
379
- else:
380
- num_warc_samples = 200
381
- default_output_docs = default_output_docs_200
382
- pipeline = LocalPipelineExecutor(
383
- pipeline=[
384
- WarcReader(data_folder="data", glob_pattern="*.warc.gz"),
385
- lambda data, rank, world_size: islice(data, num_warc_samples),
386
- ] + steps_to_run + [
387
- lambda data, rank, world_size: map(output_docs.append, data)
388
- ],
389
- logging_dir="logs",
390
- skip_completed=False
391
- )
392
- pipeline.run()
393
- out = [doc.text[:1_000] + f" [+{len(doc.text) - 1000} chars]" if len(doc.text) > 1_000 else doc.text for doc in output_docs]
394
- default_out = [doc["text"][:1_000] + f" [+{len(doc['text']) - 1000} chars]" if len(doc["text"]) > 1_000 else doc["text"] for doc in default_output_docs]
395
- output_diff = []
396
- for text_diff in Differ().compare(default_out, out[:len(default_out) * 10]):
397
- opcode, text = text_diff[0], text_diff[2:]
398
- if opcode == "-":
399
- text = f'<div class="diffDeletion">\n\n{text}\n\n</div>'
400
- elif opcode == "+":
401
- text = f'<div class="diffInsertion">\n\n{text}\n\n</div>'
402
- output_diff.append(text)
403
- return {
404
- output_tab: gr.Tab(f"Output: kept {len(out)/num_warc_samples*100:.02f}% of data"),
405
- output_dataframe_diff: pd.DataFrame({"text": output_diff}),
406
- }
407
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
408
 
409
- demo.launch()
 
 
1
  import ast
2
  import glob
 
 
3
  from itertools import islice
4
  from functools import partial
5
+ from typing import Optional, Type
6
 
7
  import gradio as gr
8
  import nltk
9
  import pandas as pd
10
+ from datatrove.data import Document
11
  from datatrove.executor.local import LocalPipelineExecutor
12
  from datatrove.pipeline.extractors import Trafilatura
13
  from datatrove.pipeline.filters.base_filter import BaseFilter
 
21
  )
22
  from datatrove.pipeline.formatters import PIIFormatter
23
  from datatrove.pipeline.readers import JsonlReader, WarcReader
24
+ from datatrove.utils.typeshelper import Languages
 
25
 
26
 
27
  nltk.download('punkt_tab')
 
111
  def prepare_as_list_or_none(text: str) -> Optional[list[str]]:
112
  return ([x.strip() for x in text.split(",") if x.strip()] or None) if text else None
113
 
114
+ def non_empty_list_or_none(input_list: list[str]) -> Optional[list[str]]:
115
+ return input_list or None
116
+
117
  def build_code_snippet(steps, params=None):
118
  # TODO
119
  return (
 
183
  language_filtering_checkbox = gr.Checkbox(True, label="Enable")
184
  with gr.Accordion("Parameters", open=True) as acc:
185
  with gr.Row():
186
+ languages_textbox = gr.Dropdown(sorted(v for k, v in vars(Languages).items() if not k.startswith("__")), multiselect=True, label="languages", info="list of languages to keep. empty for all")
187
+ languages_textbox.prepare_parameter = non_empty_list_or_none
188
  language_threshold_slider = gr.Slider(0, 1, value=0.65, step=0.05, label="language_threshold", info="minimum score to accept a document")
189
  language_filtering_checkbox.change(lambda visible: gr.Accordion(visible=visible), inputs=language_filtering_checkbox, outputs=acc)
190
  language_filtering_parameters_components = [languages_textbox, language_threshold_slider]
 
196
  with gr.Accordion("Parameters", open=True) as acc:
197
  with gr.Group():
198
  with gr.Row():
199
+ language_dropdown1 = gr.Dropdown(sorted(v for k, v in vars(Languages).items() if not k.startswith("__")), value=Languages.english, label="language", info="tokenizer language")
200
  top_n_grams_textbox = gr.Textbox("(2, 0.2), (3, 0.18), (4, 0.16)", label="top_n_grams")
201
  top_n_grams_textbox.prepare_parameter = ast.literal_eval
202
  dup_n_grams_textbox = gr.Textbox("(5, 0.15), (6, 0.14), (7, 0.13), (8, 0.12), (9, 0.11), (10, 0.10)", label="dup_n_grams")
 
250
  with gr.Row():
251
  split_paragraph_checkbox = gr.Checkbox(True, label="split_paragraph", info="disable to apply the filters to each sentence instead of to each line")
252
  with gr.Row():
253
+ language_dropdown2 = gr.Dropdown(sorted(v for k, v in vars(Languages).items() if not k.startswith("__")), value=Languages.english, label="language", info="tokenizer language")
254
  min_num_sentences_slider = gr.Slider(0, 10, value=5, step=1, label="min_num_sentences", info="remove documents that do not have at least this number of sentences (after line filtering)")
255
  min_words_per_line_slider = gr.Slider(0, 10, value=3, step=1, label="min_words_per_line", info="drop lines without this min number of words")
256
  max_word_length_slider = gr.Slider(0, 2000, value=1000, step=10, label="max_word_length", info=" drop lines where at least one word has more than this number of characters")
 
271
  with gr.Accordion("Parameters", open=True) as acc:
272
  with gr.Group():
273
  with gr.Row():
274
+ language_dropdown2 = gr.Dropdown(sorted(v for k, v in vars(Languages).items() if not k.startswith("__")), value=Languages.english, label="language", info="tokenizer language")
275
  min_doc_words_slider = gr.Slider(0, 1000, value=50, step=10, label="min_doc_words")
276
  max_doc_words_slider = gr.Slider(0, 200_000, value=100_000, step=10_000, label="max_doc_words")
277
  with gr.Row():
 
289
  gopher_filtering_quality_checkbox.change(lambda visible: gr.Accordion(visible=visible), inputs=gopher_filtering_quality_checkbox, outputs=acc)
290
  gopher_filtering_quality_parameters_components = [language_dropdown2, min_doc_words_slider, max_doc_words_slider, min_avg_word_length_slider, max_avg_word_length_slider, max_symbol_word_ratio_slider, max_bullet_lines_ratio_slider, max_ellipsis_lines_ratio_slider, max_non_alpha_words_ratio_slider, min_stop_words_slider, stop_words_textbox]
291
 
292
+ with gr.Row():
293
+ view_pipeline_results_button = gr.Button("Run Pipeline & Stream Results", variant="primary", scale=4)
294
+ stop_button = gr.Button("Stop")
295
 
296
  steps = [
297
  URLFilter,
 
315
  ]
316
 
317
  with gr.Tab("Output") as output_tab:
318
+ output_dataframe = gr.DataFrame(datatype="markdown")
319
+ with gr.Tab("Excluded") as excluded_tab:
320
+ excluded_dataframes: dict[Type, gr.DataFrame] = {}
321
+ excluded_tabs: dict[Type, gr.Tab] = {}
322
+ for step in steps:
323
+ if issubclass(step, BaseFilter) and step is not URLFilter:
324
+ with gr.Tab(step.__name__) as t:
325
+ excluded_dataframes[step] = gr.DataFrame(datatype="markdown")
326
+ excluded_tabs[step] = t
327
  with gr.Tab("Python code") as code_tab:
328
  python_code_markdown = gr.Markdown(build_code_snippet(steps))
329
 
 
348
  pii_removal_checkbox
349
  ] + sum(steps_parameters_components, [])
350
 
351
+ @view_pipeline_results_button.click(inputs=inputs, outputs=[output_tab, output_dataframe, excluded_tab] + list(excluded_dataframes.values()) + list(excluded_tabs.values()))
352
  def view_pipeline_results(*args):
353
  enable_steps, steps_parameters = args[:len(steps)], args[len(steps):]
354
  steps_parameters_iter = iter(steps_parameters)
 
367
  for step_parameters_components in steps_parameters_components
368
  ]
369
 
370
+ class ExclusionWriter:
371
+
372
+ def __init__(self) -> None:
373
+ self.docs: list[Document] = []
374
+
375
+ def __enter__(self):
376
+ return self
377
+
378
+ def __exit__(self, exc_type, exc_val, exc_tb):
379
+ return
380
+
381
+ def write(self, doc, rank):
382
+ self.docs.append(doc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
383
 
384
+ steps_to_run = [
385
+ step(**step_parameters, **({"exclusion_writer": ExclusionWriter()} if step in excluded_dataframes else {}))
386
+ for step, step_parameters, enable_step in zip(steps, steps_parameters, enable_steps)
387
+ if enable_step
388
+ ]
389
+ output_docs: list[Document] = []
390
+ num_warc_samples = 0
391
+
392
+ def increment_num_warc_samples(data, rank, world_size, num_warc_samples_per_doc=1):
393
+ nonlocal num_warc_samples
394
+ for x in data:
395
+ num_warc_samples += num_warc_samples_per_doc
396
+ yield x
397
+
398
+ if steps_parameters[:2] == default_steps_parameters[:2] and all(enable_steps[:2]):
399
+
400
+ pipeline_executor = LocalPipelineExecutor(
401
+ pipeline=[
402
+ JsonlReader(data_folder=f"output_text_extraction-2k/base_processing/output/{DUMP_TO_PROCESS}", glob_pattern="*.jsonl.gz"),
403
+ partial(increment_num_warc_samples, num_warc_samples_per_doc=2000 / 1687)
404
+ ] + steps_to_run[2:] + [
405
+ lambda data, rank, world_size: map(output_docs.append, data)
406
+ ],
407
+ logging_dir="logs",
408
+ skip_completed=False
409
+ )
410
+ else:
411
+ pipeline_executor = LocalPipelineExecutor(
412
+ pipeline=[
413
+ WarcReader(data_folder="data", glob_pattern="*.warc.gz"),
414
+ lambda data, rank, world_size: islice(data, num_warc_samples),
415
+ ] + steps_to_run + [
416
+ lambda data, rank, world_size: map(output_docs.append, data)
417
+ ],
418
+ logging_dir="logs",
419
+ skip_completed=False
420
+ )
421
+ from threading import Thread
422
+ thread = Thread(target=pipeline_executor.run)
423
+ thread.start()
424
+ while thread.is_alive():
425
+ thread.join(timeout=1)
426
+
427
+ if num_warc_samples:
428
+ yield {
429
+ output_tab: gr.Tab(f"Output (~{len(output_docs)/num_warc_samples*100:.03f}% of data)"),
430
+ excluded_tab: gr.Tab(f"Excluded (~{100 - len(output_docs)/num_warc_samples*100:.03f}% of data)"),
431
+ output_dataframe: pd.DataFrame({"text": [doc.text for doc in output_docs]}),
432
+ **{
433
+ excluded_dataframes[type(step_to_run)]: pd.DataFrame({"text": [doc.text for doc in step_to_run.exclusion_writer.docs]})
434
+ for step_to_run in pipeline_executor.pipeline
435
+ if isinstance(step_to_run, BaseFilter) and type(step_to_run) in excluded_dataframes
436
+ },
437
+ **{
438
+ excluded_tabs[type(step_to_run)]: gr.Tab(f"{type(step_to_run).__name__} (~{len(step_to_run.exclusion_writer.docs)/num_warc_samples*100:.03f}% of data)")
439
+ for step_to_run in pipeline_executor.pipeline
440
+ if isinstance(step_to_run, BaseFilter) and type(step_to_run) in excluded_dataframes
441
+ },
442
+ }
443
+ else:
444
+ yield {
445
+ output_tab: gr.Tab("Output (loading...)"),
446
+ excluded_tab: gr.Tab("Excluded (loading...)"),
447
+ **{
448
+ excluded_dataframes[type(step_to_run)]: pd.DataFrame({"text": [doc.text for doc in step_to_run.exclusion_writer.docs]})
449
+ for step_to_run in pipeline_executor.pipeline
450
+ if isinstance(step_to_run, BaseFilter) and type(step_to_run) in excluded_dataframes
451
+ },
452
+ **{
453
+ excluded_tabs[type(step_to_run)]: gr.Tab(f"{type(step_to_run).__name__} (~{len(step_to_run.exclusion_writer.docs)/num_warc_samples*100:.03f}% of data)")
454
+ for step_to_run in pipeline_executor.pipeline
455
+ if isinstance(step_to_run, BaseFilter) and type(step_to_run) in excluded_dataframes
456
+ },
457
+ }
458
+ yield {
459
+ output_tab: gr.Tab(f"Output (~{len(output_docs)/num_warc_samples*100:.03f}% of data)"),
460
+ excluded_tab: gr.Tab(f"Excluded (~{100 - len(output_docs)/num_warc_samples*100:.03f}% of data)"),
461
+ output_dataframe: pd.DataFrame({"text": [doc.text for doc in output_docs]}),
462
+ **{
463
+ excluded_dataframes[type(step_to_run)]: pd.DataFrame({"text": [doc.text for doc in step_to_run.exclusion_writer.docs]})
464
+ for step_to_run in pipeline_executor.pipeline
465
+ if isinstance(step_to_run, BaseFilter) and type(step_to_run) in excluded_dataframes
466
+ },
467
+ **{
468
+ excluded_tabs[type(step_to_run)]: gr.Tab(f"{type(step_to_run).__name__} (~{len(step_to_run.exclusion_writer.docs)/num_warc_samples*100:.03f}% of data)")
469
+ for step_to_run in pipeline_executor.pipeline
470
+ if isinstance(step_to_run, BaseFilter) and type(step_to_run) in excluded_dataframes
471
+ },
472
+ }
473
 
474
+ if __name__ == "__main__":
475
+ demo.launch()