[源碼解析] PyTorch 分布式(1) --- 數據加載之DistributedSampler

羅西的思考 2021-08-15 11:48:48 阅读数:387

本文一共[544]字,预计阅读时长:1分钟~
解析 pytorch 分布式 分布 distributedsampler

[源碼解析] PyTorch 分布式(1) --- 數據加載之DistributedSampler

0x00 摘要

為了更好的介紹參數服務器Paracel的數據加載,我們臨時插入兩篇PyTorch的數據加載(因為字數太長,所以拆成兩篇),主要是從分布式的角度進行切入。本文只算是開胃甜點,後續會有專門系列分析PyTorch分布式。

參數服務器系列其他文章如下:

[源碼解析] 機器學習參數服務器ps-lite 之(1) ----- PostOffice

[源碼解析] 機器學習參數服務器ps-lite(2) ----- 通信模塊Van

[源碼解析] 機器學習參數服務器ps-lite 之(3) ----- 代理人Customer

[源碼解析]機器學習參數服務器ps-lite(4) ----- 應用節點實現

[源碼解析] 機器學習參數服務器 Paracel (1)-----總體架構

[源碼解析] 機器學習參數服務器 Paracel (2)--------SSP控制協議實現

0x01 數據加載

1.1 加速途徑

當分布式訓練時候,為了加速訓練,有三個層面的工作需要處理。

  • 數據加載層面
  • 多機通訊層面
  • 代碼層面

數據層面,可以使用多進程並行加載來加速數據預處理過程,也有利用GPU特點來加速,比如Nvidia DALI 通過將數據預處理放到 GPU 處理來解决 CPU 瓶頸問題。

多機通訊層面,有各種集合通信庫可以利用,比如NCCL,OpenMPI, Gloo 等。

代碼層面,可以使用框架提供的分布式API,或者利用 Horovod 來改造單機版代碼,使其支持分布式任務。

接下來我們就看看數據層面如何加速。

1.2 並行處理

AI框架的數據處理主要如下並行處理:

  • 數據加載/處理使用CPU。
  • 訓練使用GPU。

在理想狀態下,應該是每輪迭代訓練之前,CPU就完成加載,准備好訓練數據,這樣訓練就可以持續無縫迭代。

然而,GPU算力每年會提昇一倍,CPU的提昇速度遠遠落後於GPU,所以CPU會是拖後腿的那個角色。這裏不僅僅是CPU算力不足的問題,也包括村存儲中讀取數據速度不足的問題。

因此,機器學習對於數據加載和前期預處理的要求越來越高,必須在GPU計算時間內,完成下一迭代數據的准備工作,不能讓GPU因為等待訓練數據而空閑。

1.3 流水線

對於機器學習訓練,加載數據可以分為三個步驟:

  • 將數據從磁盤或者分布式存儲加載到主機(CPU)。
  • 將數據從主機可分頁內存傳輸到主機固定內存。
  • 將數據從主機固定內存轉移到主機GPU。

因此,流行的深度學習框架會依據加載步驟的特點和异構硬件的特點來進行流水線處理,從而提高數據處理過程的吞吐量。

流水線一般包括多個算子,每個算子內部由數據隊列組成一個緩沖區,上遊算子完成處理之後會傳給給下遊算子進行處理。這樣每個算子任務會彼此獨立,算子內部可以使用細粒度的多線程/多進程來並行加速,每個算子可以獨立控制處理速度和內存以適配不同網絡對於處理速度的需求。

如果算子內部數據隊列不為空,模型就會一直源源不斷獲得數據,就不會因為等待訓練數據而產生瓶頸。

下面是串行處理邏輯:

+------+ +-----------+ +---------------------------+
| | | | | |
| Data +----------> | Load Data +---------> | Transfer to Pinned Memory |
| | | | | |
+------+ +-----------+ +---------------------------+

下面是並行流水線邏輯:

 +------------+
+--------+ | |
| | | Process 1 |
| Data 1 +--------> | +------+
| | | Load Data | |
+--------+ | | |
+------------+ |
|
|
|
+------------+ | +-----------------------------------+
+--------+ | | | | |
| | | Process 2 | +------> | Pin-memory process |
| Data 2 +--------> | | | |
| | | Load Data +-------------> | |
+--------+ | | | Transfer to Pinned Memory |
+------------+ +-----> | |
| | |
| +-----------------------------------+
|
+--------+ +------------+ |
| | | | |
| Data 3 +--------> | Process 3 +-------+
| | | |
+--------+ | Load Data |
| |
+------------+

1.4 GPU

本文到現在是解决CPU側的數據傳輸問題,即:從磁盤加載數據,從可分頁到固定內存。

但是,從固定內存到GPU的數據傳輸(tensor.cuda())也可以使用CUDA流進行流水線處理。

另外,深度學習應用程序需要複雜的多階段數據處理管道,包括加載、解碼、裁剪、調整大小和許多其他增强功能。這些目前在 CPU 上執行的數據處理管道已經成為瓶頸,限制了訓練和推理的性能和可擴展性。

Nvidia DALI 通過將數據預處理放到 GPU 處理來解决 CPU 瓶頸問題,用戶可以依據自己模型的特點,構建基於 GPU 的 pipeline,或者基於CPU的pipeline。

img

接下來我們就介紹PyTorch的數據加載,而且主要是從分布式的角度進行切入。

0x02 PyTorch分布式加載

2.1 DDP

pytorch為數據分布式訓練提供了多種選擇。隨著應用從簡單到複雜,從原型到產品,常見的開發軌迹可以是:

  • 如果數據和模型能放入單個GPU,使用單設備訓練,此時不用擔心訓練速度;
  • 如果服務器上有多個GPU,並且你在代碼修改量最小的情况下加速訓練,使用單個機器多GPU DataParallel;
  • 如果你想進一步加速訓練並且願意寫一點代碼來啟動,使用單個機器多個GPU DistributedDataParallel;
  • 如果應用程序跨機器邊界擴展,使用多機器DistributedDataParallel和啟動脚本;
  • 如果預期有錯誤(比如OOM)或者資源在訓練過程中可以動態連接和分離,使用torchelastic來啟動分布式訓練。

與本文最相關的部分就是DDP,Distributed Data-Parallel Training(DDP)是一個廣泛采用的單程序多數據訓練方法。使用DDP,模型會被複制到每個進程,然後每個模型副本會被輸入數據樣本的不同子集。DDP負責梯度通信以保持模型副本的同步,並將其與梯度計算重疊以加快訓練速度。

2.2 分布式加載

我們首先要看看分布式加載的總體結構。

給出示例代碼,可以看到主要使用了 DataSet, DistributedSampler,DataLoader 這三個實體。

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None), sampler=sampler)
for epoch in range(start_epoch, n_epochs):
if is_distributed:
sampler.set_epoch(epoch)
train(loader)

這三個概念的邏輯關系如下:

  • Dataset : 從名字可以知道,是數據集的意思。負責對原始訓練數據的封裝,將其封裝成 Python 可識別的數據結構,Dataset的派生類必須提供接口一邊獲取單個數據。
  • Sampler : 從名字可知,是采樣器,負責采樣方式或者說是采樣策略,實現某種提取/采樣策略從Dataset之中拿到數據索引,供DataLoade使用。可以認為,Sampler 是指揮者,負責决定戰鬥在哪裏開展。
  • DataLoader : 負責依據索引來從數據集中加載數據。支持 Map-style 和 Iterable-style 兩種Dataset,支持單進程/多進程加載。Loader 就是具體作戰的鬥士,負責按照 Sampler的命令進行戰鬥。

具體如下圖,簡要說就是:

  1. DataSet 把數據集數目發給DistributedSampler。
  2. Sampler 按照某種規則發送數據indices給Loader。
  3. Loader 依據indices加載數據。
  4. Loader 把數據發給模型,進行訓練。
+------------------------+ +-----------+
|DistributedSampler | |DataLoader |
| | 2 indices | |
| Some strategy +-------------------> | |
| | | |
|-------------+----------| | |
^ | | 4 data +-------+
| | -------------->+ train |
1 | length | | +-------+
| | |
+-------------+----------+ | |
|DataSet | | |
| +---------+ | 3 Load | |
| | Data +-------------------------> | |
| +---------+ | | |
| | | |
+------------------------+ +-----------+

因為數據集不是分布式訓練重點,所以本文接下來主要分析 Sampler。

Sampler 的重點就是:如何讓每個worker在數據集中只加載自己所屬的部分,並且worker之間實現對數據集的正交分配

0x03 DistributedSampler

對於數據並行和分布式訓練,DistributedSampler 負責其數據采樣的任務。

DistributedSampler 是 Sampler 的派生類。當 DistributedDataParallel 使用DistributedSampler 時,每個並行的進程都會得到一個DistributedSampler 實例,這個DistributedSampler 實例會給DataLoader發送指示,從而 DataLoader 加載具體數據。

DistributedSampler 加載策略負責只提供加載數據集中的一個子集,這些DistributedSampler 提供的子集之間不重疊,不交叉

3.1 初始化

__init__初始化代碼主要是設置了本worker節點的各種信息,比如數據集dataset,rank(全局GPU序號),num_replicas 副本數目。並且計算出來所有樣本數目total_size。

幾個參數如下:

  • dataset : 就是采樣的數據集。
  • num_replicas :參與分布式訓練的進程數目,如果沒有設置,則從group之中得到world_size作為進程數目。
  • rank : 當前進程的序號,如果沒有設置,則從group之中得到。
  • shuffle :采樣是否需要打亂indices。
  • seed :如果需要打亂,則設定一個random seed。
  • drop_last :如果不能均勻分割數據,是否需要把無法分配的尾部數據丟掉。
  • epoch :每次epoch都會shuffle數據集,如何保持shuffle之後數據集一致性?就是通過epoch完成。

具體代碼如下,省略了异常處理。

class DistributedSampler(Sampler[T_co]):
def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
rank: Optional[int] = None, shuffle: bool = True,
seed: int = 0, drop_last: bool = False) -> None:
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.epoch = 0
self.drop_last = drop_last
# If the dataset length is evenly divisible by # of replicas, then there
# is no need to drop any data, since the dataset will be split equally.
if self.drop_last and len(self.dataset) % self.num_replicas != 0: # type: ignore[arg-type]
# Split to nearest available length that is evenly divisible.
# This is to ensure each rank receives the same amount of data when
# using this Sampler.
self.num_samples = math.ceil(
# `type:ignore` is required because Dataset cannot provide a default __len__
# see NOTE in pytorch/torch/utils/data/sampler.py
(len(self.dataset) - self.num_replicas) / self.num_replicas # type: ignore[arg-type]
)
else:
self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) # type: ignore[arg-type]
self.total_size = self.num_samples * self.num_replicas
self.shuffle = shuffle
self.seed = seed

3.2 迭代方法

DistributedSampler 被實現成一個迭代器(類似於循環),因此會用到 python 抽象類的魔法方法:

  • __len__(self): 當被 len() 函數調用時的行為,一般返回迭代器中元素的個數。
  • __iter__(self): 當迭代容器中元素時的行為,實際上是返回是一個迭代器(通常是迭代器本身),每一次迭代得到的結果會被用來作為下一次迭代的初始值。

__iter__ 代碼的一個技術細節是:

indices = indices[self.rank:self.total_size:self.num_replicas]

當一個list之中有雙引號,比如 list[start:end:step],其意義是:

  • start: 起始比特置
  • end: 結束比特置
  • step: 步長

我們用一個例子來看看,比如:

a = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
print(a[0:15:3])
print(a[1:15:3])
print(a[2:15:3])

得到:

[1, 4, 7, 10, 13]
[2, 5, 8, 11, 14]
[3, 6, 9, 12, 15]

因為 indices[self.rank:self.total_size:self.num_replicas] 之中,num_replicas 實際就是rank的總數,所以這裏每個worker就會嚴格返回自己rank對應的那部分數據序號

總結一下DistributedSampler的分配方法是:每段連續的 num_replicas 個數據被拆成一個一個,分給 num_replicas 個進程,而且是通過每個worker 的 rank 來獲取數據,這樣就達到了不重疊不交叉的目的,但也要注意的是:這樣每個進程拿到的數據是不連續的

具體代碼如下:

class DistributedSampler(Sampler[T_co]):
def __iter__(self) -> Iterator[T_co]:
if self.shuffle: # 如果需要shuffle,則會基於epoch和seed進行處理
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
else: # 否則直接返回數據集長度序列
indices = list(range(len(self.dataset))) # type: ignore[arg-type]
# 是否需要補齊數據
if not self.drop_last:
# add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size <= len(indices):
indices += indices[:padding_size]
else:
indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
else:
# remove tail of data to make it evenly divisible.
indices = indices[:self.total_size]
assert len(indices) == self.total_size
# subsample
# 依據自己的rank,依次返回自己的數據序號
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self) -> int:
return self.num_samples
def set_epoch(self, epoch: int) -> None:
r"""
Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
use a different random ordering for each epoch. Otherwise, the next iteration of this
sampler will yield the same ordering.
Args:
epoch (int): Epoch number.
"""
self.epoch = epoch

內部變量之間邏輯如下:

  1. 從數據集獲取長度length;
  2. 從配置得到 num_replicas(有幾個rank),本身rank;
  3. 依據 數據集長度 和 num_replicas得到 num_samples 和 total_size;
  4. 最終給出 indices = indices[rank: total_size: num_replicas];
  5. 返回 indices 給DataLoader
+-----------------------------------------------------------+
| DistributedSampler |
| |
| 2 2 |
| rank +---+ num_replicas |
| + | + |
| | | | 3 |
| | | | |
| | | v |
| | | num_samples = ceil(len(dataset)/ num_replicas) |
| | | + |
| | | | |
| | | | 3 |
| | | v |
| | | total_size = num_samples * num_replicas |
| | | + |
| |4 |4 |4 |
| | | | |
| v v v |
| +-+----+------------+--------------------------------+ | +-------------+
| | | | indices | |
| | indices = indices[rank: total_size: num_replicas] +------------->+ DataLoader |
| | ^ | | 5 | |
| | | | | +-------------+
| | | | |
| +----------------------------------------------------+ |
+-----------------------------------------------------------+
|
1 | length
+------+--------+
| DataSet |
+---------------+

3.3 shuffle數據集

每次epoch都會shuffle數據集,但是不同進程如何保持shuffle之後數據集一致性

DistributedSampler 使用當前的epoch作為隨機數種子,在計算index之前就進行配置,從而保證不同進程都使用同樣的隨機數種子,這樣shuffle出來的數據就能確保一致。

3.3.1 使用

從下面代碼可以看出來,如果需要分布式訓練,就對 sampler 設置 epoch。

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None), ...,
sampler=sampler)
for epoch in range(start_epoch, n_epochs):
if is_distributed:
sampler.set_epoch(epoch) # 這設置epoch
train(loader)

3.3.2 python

具體對應 DistributedSampler 的實現。

設置 epoch 很簡單,就是配置下。

 def set_epoch(self, epoch: int) -> None:
r"""
Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
use a different random ordering for each epoch. Otherwise, the next iteration of this
sampler will yield the same ordering.
Args:
epoch (int): Epoch number.
"""
self.epoch = epoch

設置 random 種子的具體使用是在迭代函數之中:

 def __iter__(self) -> Iterator[T_co]:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch) # 這裏設置隨機種子
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
else:
indices = list(range(len(self.dataset))) # type: ignore[arg-type]
# 省略其他代碼

3.3.3 C++

我們也可以提前看看在C++ 代碼的DistributedRandomSampler,這是C++ API,也起到python同樣作用。

我們可以看到設置種子和shuffle如下:

void DistributedRandomSampler::reset(optional<size_t> new_size) {
size_ = new_size.value_or(size_);
populate_indices();
std::mt19937 rand(epoch_);
std::shuffle(all_indices_.begin(), all_indices_.end(), rand);
sample_index_ = begin_index_;
}

3.3.4 小結

我們擴展目前邏輯如下:

  1. 從數據集獲取長度length;
  2. 從配置得到 num_replicas(有幾個rank),本身rank,epoch;
  3. 用 epoch 來設置random seed;
  4. 利用random seed 對數據集 indices 進行打亂,後續就會一直使用 這個打亂的indices;
  5. 依據 數據集長度 和 num_replicas得到 num_samples 和 total_size;
  6. 結合之上各種數據條件,最終給出 indices = indices[rank: total_size: num_replicas];
  7. 返回 indices 給DataLoader
+-----------------------------------------------------------------+
| DistributedSampler |
| |
| |
| 2 3 |
| epoch +------> manual_seed(seed + epoch) +---------> indices |
| + |
| | |
| | |
| 2 2 | |
| rank +---+ num_replicas 4 | |
| + | + | |
| | | | 5 | |
| | | | | |
| | | v | |
| | | num_samples = ceil(len(dataset)/ num_replicas) | |
| | | + | |
| | | | | |
| | | | 5 | |
| | | v | |
| | | total_size = num_samples * num_replicas | |
| | | + | |
| |6 |6 |6 | |
| | | | | |
| v v v | |
| +-+----+------------+--------------------------------+ | |
| | | | |
| | indices = indices[rank: total_size: num_replicas] | <----+ |
| | ^ + | |
| | | | | |
| | | | | |
| +----------------------------------------------------+ |
+-----------------------------------------------------------------+
| |
1 | length 7 v indices
|
+-------+--------+ +-------------+
| | | |
| DataSet | | DataLoader |
| | | |
+----------------+ +-------------+

3.4 Sampler in C++

因為某些公司是C++開發,他們也有迫切的使用pytorch的需求,所以pytorch也提供了C++ API,我們接下來就看看如何實現。

3.4.1 定義

其類定義在:torch\csrc\api\include\torch\data\samplers\distributed.h

我們可以看到,DistributedSampler 是基類,主要成員變量是:

  • size_t size_ 文件大小

  • size_t num_replicas_ 副本數目

  • size_t rank_ 本sampler 對應哪個進程或者GPU

  • size_t epoch 本次訓練的epoch

  • bool allow_duplicates_ 是否允許備份

接下來是兩個派生類: DistributedRandomSampler 和 DistributedSequentialSampler。

/// A `Sampler` that selects a subset of indices to sample from and defines a
/// sampling behavior. In a distributed setting, this selects a subset of the
/// indices depending on the provided num_replicas and rank parameters. The
/// `Sampler` performs a rounding operation based on the `allow_duplicates`
/// parameter to decide the local sample count.
template <typename BatchRequest = std::vector<size_t>>
class DistributedSampler : public Sampler<BatchRequest> {
public:
DistributedSampler(
size_t size,
size_t num_replicas = 1,
size_t rank = 0,
bool allow_duplicates = true)
: size_(size),
num_replicas_(num_replicas),
rank_(rank),
epoch_(0),
allow_duplicates_(allow_duplicates) {}
/// Set the epoch for the current enumeration. This can be used to alter the
/// sample selection and shuffling behavior.
void set_epoch(size_t epoch) {
epoch_ = epoch;
}
size_t epoch() const {
return epoch_;
}
protected:
size_t local_sample_count() {
if (allow_duplicates_) {
return (size_ + num_replicas_ - 1) / num_replicas_;
} else {
return size_ / num_replicas_;
}
}
size_t size_;
size_t num_replicas_;
size_t rank_;
size_t epoch_;
bool allow_duplicates_;
};
/// Select samples randomly. The sampling order is shuffled at each `reset()`
/// call.
class TORCH_API DistributedRandomSampler : public DistributedSampler<> {
public:
DistributedRandomSampler(
size_t size,
size_t num_replicas = 1,
size_t rank = 0,
bool allow_duplicates = true);
/// Resets the `DistributedRandomSampler` to a new set of indices.
void reset(optional<size_t> new_size = nullopt) override;
/// Returns the next batch of indices.
optional<std::vector<size_t>> next(size_t batch_size) override;
/// Serializes the `DistributedRandomSampler` to the `archive`.
void save(serialize::OutputArchive& archive) const override;
/// Deserializes the `DistributedRandomSampler` from the `archive`.
void load(serialize::InputArchive& archive) override;
/// Returns the current index of the `DistributedRandomSampler`.
size_t index() const noexcept;
private:
void populate_indices();
size_t begin_index_;
size_t end_index_;
size_t sample_index_;
std::vector<size_t> all_indices_;
};
/// Select samples sequentially.
class TORCH_API DistributedSequentialSampler : public DistributedSampler<> {
public:
DistributedSequentialSampler(
size_t size,
size_t num_replicas = 1,
size_t rank = 0,
bool allow_duplicates = true);
/// Resets the `DistributedSequentialSampler` to a new set of indices.
void reset(optional<size_t> new_size = nullopt) override;
/// Returns the next batch of indices.
optional<std::vector<size_t>> next(size_t batch_size) override;
/// Serializes the `DistributedSequentialSampler` to the `archive`.
void save(serialize::OutputArchive& archive) const override;
/// Deserializes the `DistributedSequentialSampler` from the `archive`.
void load(serialize::InputArchive& archive) override;
/// Returns the current index of the `DistributedSequentialSampler`.
size_t index() const noexcept;
private:
void populate_indices();
size_t begin_index_;
size_t end_index_;
size_t sample_index_;
std::vector<size_t> all_indices_;
};

3.4.2 實現

類的具體實現比特於:torch\csrc\api\src\data\samplers\distributed.cpp

3.4.2.1 DistributedRandomSampler

我們首先看看DistributedRandomSampler。

其作用就是依據本worker 的 rank_獲取打亂的index。我們按照邏輯順序講解各個函數。

  • 初始化時候會調用 reset(size_) 進行 shuffle。
  • reset 函數作用是讓sampler指向一組新的indices:
    • 首先調用 populate_indices() 設置對應本rank的起始index,終止index。
    • populate_indices 函數之中,會對數據index 進行設置,即配置了 all_indices_,也同時配置了本rank的起始index,終止index。
    • 然後對 all_indices_ 進行shuffle。
  • next 函數就相對簡單了,因為主要工作被reset做了,所以此時數據已經被隨機打亂了,所以找到起始比特置,返回數據中對行數即可。

因為下面用到了 iota 函數,可能有的同學不熟悉,這裏說明下iota的作用:

std::vector<int> test;
test.resize(10);
std::iota(test.begin(), test.end(), 5);// 將從 5 開始的 10 次遞增值賦值給 test
//test結果:5 6 7 8 9 10 11 12 13 14

具體代碼如下:

DistributedRandomSampler::DistributedRandomSampler(
size_t size,
size_t num_replicas,
size_t rank,
bool allow_duplicates)
: DistributedSampler(size, num_replicas, rank, allow_duplicates),
begin_index_(0),
end_index_(0),
sample_index_(0) {
// shuffle first time.
reset(size_);
}
// 每次加載新epoch時候,都要調用reset
void DistributedRandomSampler::reset(optional<size_t> new_size) {
size_ = new_size.value_or(size_);
populate_indices();
std::mt19937 rand(epoch_);
// 對於數據進行shuffle
std::shuffle(all_indices_.begin(), all_indices_.end(), rand);
sample_index_ = begin_index_;
}
void DistributedRandomSampler::populate_indices() {
size_t num_local_samples = local_sample_count();
// 得到樣本數量
size_t sample_count =
num_replicas_ == 1 ? size_ : num_local_samples * num_replicas_;
all_indices_.resize(sample_count);
// std::iota 的作用是用順序遞增的值賦值指定範圍內的元素
// 這裏是給all_indices_設置從0開始到sample_count這些數值
std::iota(std::begin(all_indices_), std::end(all_indices_), 0);
// 如果sample count大於size_,則需要給多出來的那些index再賦一些數值
for (size_t i = size_; i < sample_count; ++i) {
// we may have added duplicate samples to make all
// replicas to have the same number of samples.
all_indices_[i] = i - size_;
}
begin_index_ = rank_ * num_local_samples; // 對應本rank的起始index
end_index_ = begin_index_ + num_local_samples; // 對應本rank的終止index
sample_index_ = begin_index_;
}
size_t DistributedRandomSampler::index() const noexcept {
return sample_index_;
}
// 注意,每次加載新epoch時候,都要調用reset,因此對於next函數來說,工作已經很小
optional<std::vector<size_t>> DistributedRandomSampler::next(
size_t batch_size) {
if (sample_index_ == end_index_) { // 已經提取完數據
return nullopt;
}
size_t end = sample_index_ + batch_size; // 本次迭代的終止比特置
if (end > end_index_) {
end = end_index_;
}
auto iter = all_indices_.begin(); // 因為此時數據已經被隨機打亂了,找到起始比特置即可
std::vector<size_t> res(iter + sample_index_, iter + end); // 從所有數據中提取前面若幹行
sample_index_ = end;
return res;
}
3.4.2.2 DistributedSequentialSampler

然後看看 DistributedSequentialSampler。

其作用就是依據本worker 的 rank_獲取順序的index。我們按照邏輯順序講解各個函數。

  • reset 函數就簡單多了,使用populate_indices按照順序設置index即可。
  • next 函數就相對複雜,不但要順序返回index,還需要設置下次的起始比特置。
DistributedSequentialSampler::DistributedSequentialSampler(
size_t size,
size_t num_replicas,
size_t rank,
bool allow_duplicates)
: DistributedSampler(size, num_replicas, rank, allow_duplicates),
begin_index_(0),
end_index_(0),
sample_index_(0) {
populate_indices(); // 這裏會設定本rank對應的起始比特置
}
void DistributedSequentialSampler::reset(optional<size_t> new_size) {
size_t size = new_size.value_or(size_);
if (size != size_) {
size_ = size;
populate_indices();
} else {
sample_index_ = begin_index_;
}
}
void DistributedSequentialSampler::populate_indices() {
begin_index_ = rank_ * local_sample_count(); // 本rank對應的起始比特置
end_index_ = begin_index_ + local_sample_count();
sample_index_ = begin_index_;
}
size_t DistributedSequentialSampler::index() const noexcept {
return sample_index_;
}
optional<std::vector<size_t>> DistributedSequentialSampler::next(
size_t batch_size) {
if (sample_index_ == end_index_) { // 已經循環結束
return nullopt;
}
size_t end = sample_index_ + batch_size; // 本次的終止行
if (end > end_index_) {
end = end_index_;
}
std::vector<size_t> res(end - sample_index_); // 返回的vector大小
// 給res設置從sample_index_開始遞增(end - sample_index_)這麼大的這些數值,這就是順序返回了index
std::iota(std::begin(res), std::end(res), sample_index_);
if (end >= size_) {
for (size_t& index : res) { //遍曆 vector,得到本次的index
index = index % size_;
}
}
sample_index_ = end; // 設置下次開始行
return res;
}

0xFF 參考

卷積神經網絡的並行化模型--One weird trick for parallelizing convolutional neural networks

AI框架中數據處理的挑戰與解决思路

PyTorch 源碼解讀之 torch.utils.data:解析數據處理全流程

談談你對大規模機器學習這個領域的理解和認識?

Nvidia-DALI 從放弃到入門

pytorch(分布式)數據並行個人實踐總結——DataParallel/DistributedDataParallel

版权声明:本文为[羅西的思考]所创,转载请带上原文链接,感谢。 https://gsmany.com/2021/08/20210815114841390t.html