大數據? 🤗 Datasets 來救援!
如今,不難發現我們經常使用數GB的數據集, 特別是如果你打算從頭開始預訓練像 BERT 或者 GPT-2 這樣的轉換器。 在這種情況下, 加載 數據集就是一個挑戰。例如, 用於預訓練 GPT-2 的 WebText 語料庫包含超過 800 萬個文檔和 40 GB 的文本 — 將其加載到筆記本電腦的 RAM 中可能會讓它抓狂!
幸運的是, 🤗 Datasets 旨在克服這些限制。它通過將數據集作為內存映射文件來處理,並通過在語料庫中流化條目來擺脫硬盤限制, 從而使你避免內存管理問題。
在本節中, 我們將探索🤗 Datasets 的特性。它有一個稱為 the Pile的825 GB的語料庫。 讓我們開始吧!
什麼是Pile?
The Pile 是由EleutherAI創建的一個英語文本語料庫, 用於訓練大規模語言模型。它包含各種各樣的數據集, 涵蓋科學文章, GitHub 代碼庫以及過濾的Web文本。訓練語料庫在14 GB chunks, 並且你也可以下載幾個單獨的組件。 讓我們先來看看 PubMed Abstracts 數據集, 它是PubMed上的1500萬篇生物醫學出版物的摘要的語料庫。 數據集採用JSON行格式 並使用zstandard
庫進行壓縮, 所以我們首先需要先安裝zstandard
庫:
!pip install zstandard
接下來, 我們可以使用第二節中所學的加載遠程數據集的方法加載數據集:
from datasets import load_dataset
# This takes a few minutes to run, so go grab a tea or coffee while you wait :)
data_files = "https://the-eye.eu/public/AI/pile_preliminary_components/PUBMED_title_abstracts_2019_baseline.jsonl.zst"
pubmed_dataset = load_dataset("json", data_files=data_files, split="train")
pubmed_dataset
Dataset({
features: ['meta', 'text'],
num_rows: 15518009
})
我們可以看到我們的數據集中有 15,518,009 行和 2 列 — 這是非常多的!
✎ 默認情況下, 🤗 Datasets 會自動解壓加載數據集所需的文件。 如果你想保留硬盤空間, 你可以傳遞 DownloadConfig(delete_extracted=True)
到 download_config
的 load_dataset()
參數. 有關更多詳細信息, 請參閱文檔](https://huggingface.co/docs/datasets/package_reference/builder_classes#datasets.DownloadConfig)。
讓我們看看數據集的第一個元素的內容:
pubmed_dataset[0]
{'meta': {'pmid': 11409574, 'language': 'eng'},
'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection.\nTo determine the prevalence of hypoxaemia in children aged under 5 years suffering acute lower respiratory infections (ALRI), the risk factors for hypoxaemia in children under 5 years of age with ALRI, and the association of hypoxaemia with an increased risk of dying in children of the same age ...'}
可以看到, 這看起來像是醫學文章的摘要。 現在讓我們看看我們使用了RAM的多少存儲空間來加載數據集!
內存映射的魔力
在 Python 中測量內存使用情況的一個簡單的方法是使用psutil
庫,它可以使用 pip
安裝, 如下所示:
!pip install psutil
它提供了一個 Process
類,這個類允許我們檢查當前進程的內存使用情況, 如下所示:
import psutil
# Process.memory_info is expressed in bytes, so convert to megabytes
print(f"RAM used: {psutil.Process().memory_info().rss / (1024 * 1024):.2f} MB")
RAM used: 5678.33 MB
這裡的rss
屬性是指 常駐集 的大小, 它是進程在RAM中佔用的內存比例。 這個測量結果也包括了 Python 編譯器和我們加載的庫所使用的內存, 所以實際上用於加載數據集的內存會更小一些。為了比較, 讓我們使用 dataset_size
屬性看看數據集在磁盤上有多大。 由於結果像之前一樣用字節表示, 我們需要手動將其轉換為GB:
print(f"Number of files in dataset : {pubmed_dataset.dataset_size}")
size_gb = pubmed_dataset.dataset_size / (1024**3)
print(f"Dataset size (cache file) : {size_gb:.2f} GB")
Number of files in dataset : 20979437051
Dataset size (cache file) : 19.54 GB
非常棒 — 儘管它將近20GB, 但我們能夠佔用很少的RAM空間加載和訪問數據集!
✏️ 試試看! 從subsets中選擇一個大於你的筆記本或者臺式機的RAM大小的子集, 用 🤗 Datasets加載這個數據集, 並且測量RAM的使用量。 請注意, 要獲得準確的測量結果, 你需要在另一個進程中執行這個操作。你可以在 the Pile paper的表一中找到每個子集解壓後的大小。
如果你熟悉 Pandas, 這個結果可能會讓人感到很意外。因為 Wes Kinney 的著名的經驗法則 是你需要的RAM應該是數據集的大小的5倍到10倍。 那麼 🤗 Datasets 是如何解決這個內存管理問題的呢? 🤗 Datasets 將每一個數據集看作一個內存映射文件, 它提供了RAM和文件系統存儲之間的映射, 該映射允許庫訪問和操作數據集的元素, 而且無需將其完全加載到內存中。
內存映射文件也一個在多個進程之間共享, 這使得像 Dataset.map()
之類的方法可以並行化, 並且無需移動或者賦值數據集。在底層, 這些功能都是由Apache Arrow內存格式和pyarrow
庫提供的支持, 使得數據加載和處理速度快如閃電。 (更多有關Apache Arrow的詳細信息以及與Pandas的比較, 請查看Dejan Simic’s blog post.) 為了更清晰地看到這個過程, 讓我們通過迭代PubMed Abstracts數據集中的所有元素來運行一個速度測試小程序:
import timeit
code_snippet = """batch_size = 1000
for idx in range(0, len(pubmed_dataset), batch_size):
_ = pubmed_dataset[idx:idx + batch_size]
"""
time = timeit.timeit(stmt=code_snippet, number=1, globals=globals())
print(
f"Iterated over {len(pubmed_dataset)} examples (about {size_gb:.1f} GB) in "
f"{time:.1f}s, i.e. {size_gb/time:.3f} GB/s"
)
'Iterated over 15518009 examples (about 19.5 GB) in 64.2s, i.e. 0.304 GB/s'
這裡我們使用了 Python的 timeit
模塊來測量執行 code_snippet
所耗的時間。 你通常能以十分之幾GB/s到幾GB/s的速度迭代數據集。通過上述的方法就已經能夠解決大多數大數據集加載的限制, 但是有時候你不得不使用一個很大的數據集, 它甚至都不能存儲在筆記本電腦的硬盤上。例如, 如果我們嘗試下載整個 Pile, 我們需要825GB的可用磁盤空間! 為了處理這種情況, 🤗 Datasets 提供了一個流式功能, 這個功能允許我們動態下載和訪問元素, 並且不需要下載整個數據集。讓我們來看看這個功能是如何工作的。
💡在 Jupyter 筆記中你還可以使用%%timeit
magic function為單元格計時。
流式數據集
要使用數據集流, 你只需要將 streaming=True
參數傳遞給 load_dataset()
函數。接下來, 讓我們再次加載 PubMed Abstracts 數據集, 但是採用流模式:
pubmed_dataset_streamed = load_dataset(
"json", data_files=data_files, split="train", streaming=True
)
與我們在本章其他地方遇到的熟悉的 Dataset
不同, streaming=True
返回的對象是一個 IterableDataset
。 顧名思義, 要訪問 IterableDataset
, 我們需要迭代它。我們可以按照如下方式訪問流式數據集的第一個元素:
next(iter(pubmed_dataset_streamed))
{'meta': {'pmid': 11409574, 'language': 'eng'},
'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection.\nTo determine the prevalence of hypoxaemia in children aged under 5 years suffering acute lower respiratory infections (ALRI), the risk factors for hypoxaemia in children under 5 years of age with ALRI, and the association of hypoxaemia with an increased risk of dying in children of the same age ...'}
如果您需要在訓練期間標記流式數據集中的元素可以使用 IterableDataset.map()
進行動態處理。該過程與我們在第三章中標記數據集的過程完全相同, 唯一的區別是輸出是逐個返回的:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
tokenized_dataset = pubmed_dataset_streamed.map(lambda x: tokenizer(x["text"]))
next(iter(tokenized_dataset))
{'input_ids': [101, 4958, 5178, 4328, 6779, ...], 'attention_mask': [1, 1, 1, 1, 1, ...]}
💡 你可以傳遞 batched=True
來通過流式加速標記化, 如同我們在上一節看到的那樣。它將逐批處理示例; 默認的批量大小為 1,000, 可以使用 batch_size
參數指定批量大小。
你還可以使用 IterableDataset.shuffle()
打亂流式數據集, 但與 Dataset.shuffle()
不同的是這隻會打亂預定義 buffer_size
中的元素:
shuffled_dataset = pubmed_dataset_streamed.shuffle(buffer_size=10_000, seed=42)
next(iter(shuffled_dataset))
{'meta': {'pmid': 11410799, 'language': 'eng'},
'text': 'Randomized study of dose or schedule modification of granulocyte colony-stimulating factor in platinum-based chemotherapy for elderly patients with lung cancer ...'}
在這個示例中, 我們從緩衝區的前 10,000 個示例中隨機選擇了一個示例。一旦訪問了一個示例, 它在緩衝區中的位置就會被語料庫中的下一個示例填充 (即, 上述案例中的第 10,001個示例)。你還可以使用 IterableDataset.take()
和 IterableDataset.skip()
函數從流式數據集中選擇元素, 它的作用類似於 Dataset.select()
。例如, 要選擇 PubMed Abstracts 數據集的前5個示例, 我們可以執行以下操作:
dataset_head = pubmed_dataset_streamed.take(5)
list(dataset_head)
[{'meta': {'pmid': 11409574, 'language': 'eng'},
'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection ...'},
{'meta': {'pmid': 11409575, 'language': 'eng'},
'text': 'Clinical signs of hypoxaemia in children with acute lower respiratory infection: indicators of oxygen therapy ...'},
{'meta': {'pmid': 11409576, 'language': 'eng'},
'text': "Hypoxaemia in children with severe pneumonia in Papua New Guinea ..."},
{'meta': {'pmid': 11409577, 'language': 'eng'},
'text': 'Oxygen concentrators and cylinders ...'},
{'meta': {'pmid': 11409578, 'language': 'eng'},
'text': 'Oxygen supply in rural africa: a personal experience ...'}]
同樣, 你可以使用 IterableDataset.skip()
函數將打亂的數據集拆分為訓練集和驗證集, 如下所示:
# Skip the first 1,000 examples and include the rest in the training set
train_dataset = shuffled_dataset.skip(1000)
# Take the first 1,000 examples for the validation set
validation_dataset = shuffled_dataset.take(1000)
讓我們用一個常見的任務來進行我們對數據集流的最後探索: 將多個數據集組合在一起創建一個心得語料庫。 🤗 Datasets 提供了一個 interleave_datasets()
函數, 它將一個 IterableDataset
對象列表組合為單個的 IterableDataset
, 其中新數據集的元素是通過在列表中的對象交替獲得的。當你試圖組合大型數據集時, 這個函數特別有用, 讓我們通過下面這個例子來試著組合 Pile的自由法律數據集,它是來自美國法院的51 GB的法律意見數據集:
law_dataset_streamed = load_dataset(
"json",
data_files="https://the-eye.eu/public/AI/pile_preliminary_components/FreeLaw_Opinions.jsonl.zst",
split="train",
streaming=True,
)
next(iter(law_dataset_streamed))
{'meta': {'case_ID': '110921.json',
'case_jurisdiction': 'scotus.tar.gz',
'date_created': '2010-04-28T17:12:49Z'},
'text': '\n461 U.S. 238 (1983)\nOLIM ET AL.\nv.\nWAKINEKONA\nNo. 81-1581.\nSupreme Court of United States.\nArgued January 19, 1983.\nDecided April 26, 1983.\nCERTIORARI TO THE UNITED STATES COURT OF APPEALS FOR THE NINTH CIRCUIT\n*239 Michael A. Lilly, First Deputy Attorney General of Hawaii, argued the cause for petitioners. With him on the brief was James H. Dannenberg, Deputy Attorney General...'}
這個數據集足夠大, 可以對大多數筆記本電腦的RAM有足夠的壓力, 但是我們已經能夠毫不費力地加載和訪問它! 現在我們使用 interleave_datasets()
函數加載來自 FreeLaw 和 PubMed Abstracts 的數據集:
from itertools import islice
from datasets import interleave_datasets
combined_dataset = interleave_datasets([pubmed_dataset_streamed, law_dataset_streamed])
list(islice(combined_dataset, 2))
[{'meta': {'pmid': 11409574, 'language': 'eng'},
'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection ...'},
{'meta': {'case_ID': '110921.json',
'case_jurisdiction': 'scotus.tar.gz',
'date_created': '2010-04-28T17:12:49Z'},
'text': '\n461 U.S. 238 (1983)\nOLIM ET AL.\nv.\nWAKINEKONA\nNo. 81-1581.\nSupreme Court of United States.\nArgued January 19, 1983.\nDecided April 26, 1983.\nCERTIORARI TO THE UNITED STATES COURT OF APPEALS FOR THE NINTH CIRCUIT\n*239 Michael A. Lilly, First Deputy Attorney General of Hawaii, argued the cause for petitioners. With him on the brief was James H. Dannenberg, Deputy Attorney General...'}]
這裡我們使用了來自Python的 itertools
模塊的 islice()
函數從合併的數據集中選擇前兩個示例, 並且我們可以看到它們實際上就是兩個源數據集中的前兩個示例拼在一起形成的:
最後, 如果你想流式傳輸整個825GB的 Pile, 你可以按照如下方式獲取所有準備好的文件:
base_url = "https://the-eye.eu/public/AI/pile/"
data_files = {
"train": [base_url + "train/" + f"{idx:02d}.jsonl.zst" for idx in range(30)],
"validation": base_url + "val.jsonl.zst",
"test": base_url + "test.jsonl.zst",
}
pile_dataset = load_dataset("json", data_files=data_files, streaming=True)
next(iter(pile_dataset["train"]))
{'meta': {'pile_set_name': 'Pile-CC'},
'text': 'It is done, and submitted. You can play “Survival of the Tastiest” on Android, and on the web...'}
✏️ 試試看! 使用像mc4
或者 oscar
這樣的大型 Common Crawl 語料庫來創建一個流式多語言數據集, 該數據集代表你選擇的國家/地區語言的口語比例。例如, 瑞士的四種民族語言分別是德語、法語、意大利語和羅曼什語, 因此你可以嘗試根據根據口語比例對Oscar子集進行採用來創建瑞士語料庫。
你現在擁有加載和處理各種類型和大小的數據集的所需的所有工具 — 但是除非你非常幸運, 否則在你的NLP之旅中會有一個難題, 你將不得不創建一個數據集來解決手頭的問題。這就是下一節的主題!