コア数が20あるCPUでシングルプロセスで処理していたのをマルチプロセスにしたところ10倍速くなってビックリした。 やり方も簡単で、以下のようにgroup毎に集計する処理を並列化するだけでいい。これは便利。
def aggregate(sales: DataFrame, window: int):
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst, axis=0)
def _agg(df: DataFrame):
return df[SALES_COUNT_LAG_1].rolling(window) \
.agg(["mean", "max", "min", "std", lambda x: x.mode()[0]]) \
.reset_index().set_index("seq_idx").astype("float32")
return applyParallel(sales.groupby([STORE_ID, ITEM_ID]), _agg)
参考: