Intelligent Technology's Technical Blog

株式会社インテリジェントテクノロジーの技術情報ブログです。

Polarsの並列処理に触れてみる

櫻です。

最近は開発にしろちょっとしたツールにしろPythonでコード書くことが増えてきています。

ところがPythonではGILがあるためマルチコアを活用するにはJavaなどに比べると手間が増えてしまいます。
ちょっとしたデータ処理はpandas*1を利用していることが多かったのですが、何も工夫しないでコードを書くとCPUのコアを一つしか利用してくれないことが多く、もやもやすることも良くあります。

そこで動作が速いと噂のpolars*2ならどうなる?というのを試してみました。

Polarsとは?

Polarsはpandasと同様にDataFrameを扱うライブラリでRustで実装されており、マルチスレッド化やSIMDで高速に動作することが謳われています。
pandasとはAPIが大きく異なっていますが、個人的にはsimpleで利用しやすいと感じています。

利用したバージョン

python 3.11.3
pandas 2.0.3
polars 0.18.13

CPUは物理6コア、論理12コアでの実行結果になります。

方針

データとして10列10,000,000行のcsvファイルを作成して、このデータを利用して、以下の処理時間を確認してきます。

  • csv読み込み
  • DataFrameの組み込み関数(mean)
  • 列の追加
  • pythonコード(lambda)呼び出し

psutilのcpu_percent*3で論理コア毎のCPU使用率が取得できるようなので、このメソッドで処理中のCPU利用率を確認します。

import numpy as np
import polars as pl

data = np.random.random((10_000_000, 10))
df = pl.from_numpy(data)
df.write_csv("data.csv")

csv読み込み

まずはcsvの読み込み時間を比べてみます。

python

比較用にpythonのcsvでファイルを読んでみます。

import csv
import time
import psutil

before = time.perf_counter_ns()
psutil.cpu_percent(None, percpu=True)
with open("data.csv") as f:
    reader = csv.reader(f)
    for row in reader:
        rows.append(row)
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"load {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果は以下のように44秒かかりました。

load 44,226,614,620. cpu: [26.4, 25.8, 8.5, 5.1, 1.3, 1.2, 0.2, 0.2, 0.2, 0.2, 0.1, 0.0](69.2)
pandas

次にpandasでの実行です。

import time
import pandas as pd
import psutil

psutil.cpu_percent(None, percpu=True)
before = time.perf_counter_ns()
df = pd.read_csv("data.csv")
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"loaded {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果は11秒と流石にpythonに比べると高速になっています。

loaded 11,153,840,017. cpu: [38.4, 19.4, 19.2, 0.4, 0.4, 0.3, 0.2, 0.1, 0.1, 0.0, 0.0, -0.0](78.5)
Polars

続いてPolars

import psutil
import time
import polars as pl

before = time.perf_counter_ns()
psutil.cpu_percent(None, percpu=True)
df = pl.read_csv("data.csv")
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"loaded {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果は8秒とpandasより高速です。そしてなんと複数コアを活用しています。

loaded 8,362,183,927. cpu: [65.8, 40.4, 40.1, 39.8, 39.8, 39.3, 39.2, 39.1, 38.4, 38.0, 38.0, 37.7](495.6000000000001)

DataFrameの組み込み関数(mean)

次にDataFrameのmeanを計測してみます。

pandas

まずはpandas

psutil.cpu_percent(None, percpu=True)
before = time.perf_counter_ns()
m = df.mean()
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"mean: {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果は約100msと高速ではあるのですが、CPUは1コアしか利用できていないようです。各カラム独立して実行できそうなのに意外でした。

mean: 98,806,534. cpu: [100.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0](100.0)
Polars

次にPolars

before = time.perf_counter_ns()
m = df.mean()
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"mean: {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果は25msとpandasより高速で、きちんと複数コア利用できていました。さすがです。

mean: 24,505,231. cpu: [100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 66.7, 66.7, 66.7, 66.7, 0.0, 0.0](866.8000000000001)

列の追加

次は列の追加を計測してみます。
pandasでよく見る形でコードを書くと直列に処理を行いそうです。

pandas
psutil.cpu_percent(None, percpu=True)
before = time.perf_counter_ns()
for column in df.columns:
    df[f"new_{column}"] = df[column] ** 3.5
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"add column: {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果は1.1秒で1コアのみ利用。予想通りでした。

add column: 1,117,079,140. cpu: [100.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0](100.0)
Polars

Polarsは複数一気に処理するAPIがあり、並列に実行できそうです。

psutil.cpu_percent(None, percpu=True)
before = time.perf_counter_ns()
df2 = df.with_columns([(pl.col(c) ** 3.5).alias(f"{c}_pow") for c in df.columns])
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"with_columns: {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果は220msと高速で、やはり並列に処理できています。

with_columns: 218,955,216. cpu: [100.0, 95.2, 95.0, 95.0, 90.5, 90.5, 90.5, 90.5, 90.0, 90.0, 37.5, 8.3](973.0)

pythonコード(lambda)呼び出し

おまけでpythonで定義した処理を適用するパターンです。
こちらはGILがあるため原理的に1コアしか利用できず、大幅に遅くなることが予想できます。

pandas

まずはpandas。applymapで全データにlambdaを適用してみます。

psutil.cpu_percent(None, percpu=True)
before = time.perf_counter_ns()
df1 = df.applymap(lambda x: x**3.5)
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"applymap: {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

結果18秒。やはり時間がかかってしまいますね。

applymap: 18,457,625,072. cpu: [60.1, 20.9, 11.6, 8.9, 4.0, 0.4, 0.1, 0.1, 0.1, 0.1, 0.0, 0.0](106.3)
Polars

続いてPolars。

psutil.cpu_percent(None, percpu=True)
before = time.perf_counter_ns()
df1 = df.select([df[c].apply(lambda x: x ** 3.5) for c in df.columns])
after = time.perf_counter_ns()
cpu = psutil.cpu_percent(None, percpu=True)
print(f"apply lambda: {after - before:,}. cpu: {sorted(cpu, reverse=True)}({sum(cpu)})")

こちらの結果も20秒と予想通り低速です。

apply lambda: 20,486,174,452. cpu: [81.9, 17.6, 3.9, 3.0, 0.3, 0.2, 0.1, 0.1, 0.0, 0.0, 0.0, 0.0](107.10000000000001)

ただし、Polarsはなんと以下のようにこのAPIは大幅に遅いことと修正するコードのサンプルがログに出力されていました。
このあたりの親切さはRustの文化っぽいですね。

/opt/project/src/my_project/pl_sample.py:68: PolarsInefficientApplyWarning: 
Series.apply is significantly slower than the native series API.
Only use if you absolutely CANNOT implement your logic otherwise.
In this case, you can replace your `apply` with the following:
  - s.apply(lambda x: ...)
  + s ** 3.5
  df1 = df.select([df[c].apply(lambda x: x ** 3.5) for c in df.columns])

まとめ

以上、いくつかの動作をpandasとPolarsで見てきました。
やはりpandasで工夫なく書くとCPU1コアしか利用できないことが多いのに対して、Polarsでは素直なコードでも複数コアの恩恵が受けられそうです。
チームで共有するコードだと全てをPolarsに置き換えるのは難しいかもしれませんが、ちょっとしたコード程度であればPolarsを積極的に採用する理由が増えたかなと思います。