Python 平行化運算 - Multi-Processing

Posted by MingLun Allen Wu on Wednesday, May 20, 2020

Abstract

使用 Python 進行資料前處理時,大量的資料常會造成處理時間過長,這時候總希望能透過平行化處理來解決。

提到平行化處理又總會陷入疑惑: 「該使用Multi-Thread 還是 Multi-Process」?

以我的經驗,在進行「資料前處理」時,其實是需要 CPU 持續計算的,所以並不適合使用「Multi-thread」。今天這則筆記是嘗試以「Multi-Process」的方式來對前處理階段進行加速。

近期需要使用 transformers套件來將長篇幅的文字轉換成 index,並且裁切成長度 512 的 array。 transformers執行的速度並不慢,但是當處理的資料達到七~八萬篇時,整個過程也需要耗費將近三個小時。但是在計算的過程中,打開 htop 會發現只使用一顆 CPU 在計算,這時總會希望能夠用全部的 CPU 資源來進行加速:

我希望能夠透過平行化處理達到下面這個狀態:

最後成功地將處理時間從 2.5 小時 縮短到 20 分鐘!


Presquile

使用 Multi-Processing 進行平行運算前,需要確認:

  • 由於不同的Process 間無法共享資料,所以如果平行運算的過程中需要互相取用資料,就不適合透過 Multi-Process 的方式執行。舉例來說:我的碩士論文需要尋訪數萬個字的同義字來建立一個 Graph,如果使用Multi-Process 是沒有辦法操作一個共同的 Graph 的。

Concept

這次透過 python 內建的 multiprocessing來實作,我們將實作的重要觀念分成四個部分:

  1. Function:

    把要透過平行化進行加速的任務封裝在 Function 中。

    舉例來說如果想要計算當前資料集每一個商品 id 的 “price” 欄位總和:

    def sum_function(df: pd.DataFrame):
        return df.groupby('id').agg("price").sum()
    
  2. Tasks:

    Tasks是一個 List,因為我們要同時使用多個Process進行計算,所以要在此對參數進行分割。 假設共有 500 篇文章,我們可以將資料分割為:

    tasks = [df[0:100], df[100:200], df[200:300], df[300:400], df[400:500]]
    
  3. Pool:

    透過 multiprocessing.pool 可以自動建立分工機制,我們不需要做額外的處理與設定:

    pool = multiprocessing.pool(processes= 5)
    
  4. Map:

    在設定好 Function, Tasks 及 Pool 後,我們就能透過 poolmap 進行平行處理:

    # 第一個放Function, 第二個放分割後的參數
    result = pool.map(sum_function, tasks) 
    

執行上述程式碼時, pool 會自動將 tasks 中的參數依序丟到 N 個 Process 中執行 sum_function,並且在執行結束後,將結果回傳到 result 。

因為 tasks 中共有5個 Element,所以最後 Result 也會有 5 個處理完畢的 Element. 這時候可以再根據需要將其 Aggregate 成最後的結果。

再來一個較為複雜的範例:

我們要將一個 Dataframe 先移除 Stop word (remove_stopwrods()),再將其裁切為長度512的段落(truncate_text())

import multiprocessing as mp

def remove_stopwords(text): # 移除 Stop word
        # Do something.
        return cleaned_text

def truncate_text(text):  # 裁切文字,將長篇幅文字裁切成 512個字的list.
        # Do something.
        return [512word_list, 512word_list, 512word_list]

def process(dataframe):
    result = list() # 儲存每一個Process執行的成果。
    for idx, row in dataframe.iterrows():
        cleaned_text = remove_stopwords(row["text"]) # Remove stopwords
        truncated_res = truncate_text(cleaned_text) # Truncate long words to list.
        result.append(res)
    return result 

使用到的 Function 有三項,不需要了解 Function 的功能是做什麼,只需要知道:

  1. remove_stopwords() 以及 truncate_text() 是做資料前處理的兩個 function。
  2. process() 則是接受分割後的「dataframe」,使用 remove_stopwords()truncate_text() 對dataframe 中的每一筆資料進行處理。

我們可以在平行化的過程中使用多個 Function,但是在使用 multiprocessing.pool 進行平行化處理時,只能接受一個 Function,所以需要把所有流程中使用到的 Function 都打包在一起。 (就如同上例將 remove_stopwords()truncate_text() 都打包在 process() 中。)

if __name__ == "__main__":
    df = pickle.load()
    tasks = [df[:100], df[100:200], df[200:300]]
    pool = mp.pool(processes = 3)

    result = pool.map(process, tasks) # 平行處理,全部處理完後會將結果存回 result.
    
    final_result = some_aggregation(result) #最後可能需要再對資料進行整合。

Conclusion

透過 Multi-Process 的方式能夠大幅度的提升資料前處理的效率,在沒有集群的情況下,利用多顆 CPU 來增加速度。

最近也嘗試在研究 Dask 這個套件,能夠輕易地切換不同模式: “Multi-thread”、“Multi-process” 以及多台機器組成的集群,對於 NumpySklearn 等機器學習套件也有很好的支援,很適合用來進行 Python 的平行化處理。

希望本篇筆記對點閱的你有幫助! 有任何問題歡迎隨時跟我交流!



See Also