Abstract
使用 Python 進行資料前處理時,大量的資料常會造成處理時間過長,這時候總希望能透過平行化處理來解決。
提到平行化處理又總會陷入疑惑: 「該使用Multi-Thread 還是 Multi-Process」?
以我的經驗,在進行「資料前處理」時,其實是需要 CPU 持續計算的,所以並不適合使用「Multi-thread」。今天這則筆記是嘗試以「Multi-Process」的方式來對前處理階段進行加速。
近期需要使用 transformers
套件來將長篇幅的文字轉換成 index,並且裁切成長度 512 的 array。 transformers
執行的速度並不慢,但是當處理的資料達到七~八萬篇時,整個過程也需要耗費將近三個小時。但是在計算的過程中,打開 htop
會發現只使用一顆 CPU 在計算,這時總會希望能夠用全部的 CPU 資源來進行加速:
我希望能夠透過平行化處理達到下面這個狀態:
最後成功地將處理時間從 2.5 小時 縮短到 20 分鐘!
Presquile
使用 Multi-Processing 進行平行運算前,需要確認:
- 由於不同的Process 間無法共享資料,所以如果平行運算的過程中需要互相取用資料,就不適合透過 Multi-Process 的方式執行。舉例來說:我的碩士論文需要尋訪數萬個字的同義字來建立一個 Graph,如果使用Multi-Process 是沒有辦法操作一個共同的 Graph 的。
Concept
這次透過 python 內建的 multiprocessing
來實作,我們將實作的重要觀念分成四個部分:
Function:
把要透過平行化進行加速的任務封裝在 Function 中。
舉例來說如果想要計算當前資料集每一個商品 id 的 “price” 欄位總和:
def sum_function(df: pd.DataFrame): return df.groupby('id').agg("price").sum()
Tasks:
Tasks是一個 List,因為我們要同時使用多個Process進行計算,所以要在此對參數進行分割。 假設共有 500 篇文章,我們可以將資料分割為:
tasks = [df[0:100], df[100:200], df[200:300], df[300:400], df[400:500]]
Pool:
透過
multiprocessing.pool
可以自動建立分工機制,我們不需要做額外的處理與設定:pool = multiprocessing.pool(processes= 5)
Map:
在設定好 Function, Tasks 及 Pool 後,我們就能透過
pool
的map
進行平行處理:# 第一個放Function, 第二個放分割後的參數 result = pool.map(sum_function, tasks)
執行上述程式碼時, pool
會自動將 tasks 中的參數依序丟到 N 個 Process 中執行 sum_function,並且在執行結束後,將結果回傳到 result 。
因為 tasks 中共有5個 Element,所以最後 Result 也會有 5 個處理完畢的 Element. 這時候可以再根據需要將其 Aggregate 成最後的結果。
再來一個較為複雜的範例:
我們要將一個 Dataframe 先移除 Stop word (remove_stopwrods()
),再將其裁切為長度512的段落(truncate_text()
)
import multiprocessing as mp
def remove_stopwords(text): # 移除 Stop word
# Do something.
return cleaned_text
def truncate_text(text): # 裁切文字,將長篇幅文字裁切成 512個字的list.
# Do something.
return [512word_list, 512word_list, 512word_list]
def process(dataframe):
result = list() # 儲存每一個Process執行的成果。
for idx, row in dataframe.iterrows():
cleaned_text = remove_stopwords(row["text"]) # Remove stopwords
truncated_res = truncate_text(cleaned_text) # Truncate long words to list.
result.append(res)
return result
使用到的 Function 有三項,不需要了解 Function 的功能是做什麼,只需要知道:
remove_stopwords()
以及truncate_text()
是做資料前處理的兩個 function。process()
則是接受分割後的「dataframe」,使用remove_stopwords()
及truncate_text()
對dataframe 中的每一筆資料進行處理。
我們可以在平行化的過程中使用多個 Function,但是在使用 multiprocessing.pool
進行平行化處理時,只能接受一個 Function,所以需要把所有流程中使用到的 Function 都打包在一起。 (就如同上例將 remove_stopwords()
及 truncate_text()
都打包在 process()
中。)
if __name__ == "__main__":
df = pickle.load()
tasks = [df[:100], df[100:200], df[200:300]]
pool = mp.pool(processes = 3)
result = pool.map(process, tasks) # 平行處理,全部處理完後會將結果存回 result.
final_result = some_aggregation(result) #最後可能需要再對資料進行整合。
Conclusion
透過 Multi-Process 的方式能夠大幅度的提升資料前處理的效率,在沒有集群的情況下,利用多顆 CPU 來增加速度。
最近也嘗試在研究 Dask 這個套件,能夠輕易地切換不同模式: “Multi-thread”、“Multi-process” 以及多台機器組成的集群,對於 Numpy
及 Sklearn
等機器學習套件也有很好的支援,很適合用來進行 Python 的平行化處理。
希望本篇筆記對點閱的你有幫助! 有任何問題歡迎隨時跟我交流!