1046 字
5 分钟
阅读量加载中...
polars学习-11-用户自定义函数

背景#

polars学习系列文章,第11篇 用户自定义函数,python 自定义函数如何与 polars 结合使用

该库目前已更新到 1.37.1 版本,近一年版本更新迭代的速度非常快,之前分享的前10篇文章的版本是 1.2.1

该系列文章会分享到github,大家可以去下载jupyter文件,进行参考学习 仓库地址:https://github.com/DataShare-duo/polars_learn

小编运行环境#

import sys
print('python 版本:', sys.version.split('|')[0])
#python 版本: 3.11.11
import polars as pl
print("polars 版本:", pl.__version__)
#polars 版本: 1.37.1

提供的 api 函数/接口/方法#

  • map_elements :对列中的每个值,传入函数,类似pandas中的map
  • map_batches :整个列全部传入函数,类似pandas中的apply

示例数据#

df = pl.DataFrame(
{
"keys": ["a", "a", "b", "b"],
"values": [10, 7, 1, 23],
}
)
print(df)
shape: (4, 2)
┌──────┬────────┐
│ keys ┆ values │
------
str ┆ i64 │
╞══════╪════════╡
│ a ┆ 10
│ a ┆ 7
│ b ┆ 1
│ b ┆ 23
└──────┴────────┘

map_elements 用法#

import math
def my_log(value):
return math.log(value) # math.log 应用与每个值
out = df.select(pl.col("values").map_elements(my_log, return_dtype=pl.Float64))
print(out)
shape: (4, 1)
┌──────────┐
│ values │
---
│ f64 │
╞══════════╡
2.302585
1.94591
0.0
3.135494
└──────────┘

存在问题:

  1. 限于单个项:只用应用在单个值上面,而不能一次应用到整个列
  2. 性能开销:为每个单独的项调用函数也很慢,所有这些额外的函数调用会增加大量的开销

map_batches 用法#

def diff_from_mean(series):
total = 0
for value in series:
total += value
mean = total / len(series)
return pl.Series([value - mean for value in series])
out = df.select(pl.col("values").map_batches(diff_from_mean, return_dtype=pl.Float64))
print("== select() with UDF ==")
print(out)
== select() with UDF ==
shape: (4, 1)
┌────────┐
│ values │
---
│ f64 │
╞════════╡
-0.25
-3.25
-9.25
12.75
└────────┘
print("== group_by() with UDF ==")
out = df.group_by("keys").agg(
pl.col("values").map_batches(diff_from_mean, return_dtype=pl.Float64)
)
print(out)
== group_by() with UDF ==
shape: (2, 2)
┌──────┬───────────────┐
│ keys ┆ values │
------
str ┆ list[f64] │
╞══════╪═══════════════╡
│ a ┆ [1.5, -1.5] │
│ b ┆ [-11.0, 11.0] │
└──────┴───────────────┘

提升用户自定义函数性能#

numpy 通用函数#

纯python实现的自定义函数一般速度都比较慢,要尽量减少代用python实现的方法,可以调用 numpy 中的实现的通用函数/算子,来加速,实际是通过调用C语言的轮子来加速

import numpy as np
out = df.select(pl.col("values").map_batches(np.log, return_dtype=pl.Float64))
print(out)

通过 Numba 提升自定义函数性能#

如果 numpy 中没有可用的函数,那么自定义函数可以通过 Numba 来提速,即时编译

from numba import guvectorize, int64, float64
@guvectorize([(int64[:], float64[:])], "(n)->(n)")
def diff_from_mean_numba(arr, result):
total = 0
for value in arr:
total += value
mean = total / len(arr)
for i, value in enumerate(arr):
result[i] = value - mean
out = df.select(
pl.col("values").map_batches(diff_from_mean_numba, return_dtype=pl.Float64)
)
print("== select() with UDF ==")
print(out)
out = df.group_by("keys").agg(
pl.col("values").map_batches(diff_from_mean_numba, return_dtype=pl.Float64)
)
print("== group_by() with UDF ==")
print(out)

注意事项#

加速时,数据缺失是不行的,在利用numba装饰器@guvectorize加速时,要么填充缺失值,要么删除缺失值,否则polars会报错

组合多列#

@guvectorize([(int64[:], int64[:], float64[:])], "(n),(n)->(n)")
def add(arr, arr2, result):
for i in range(len(arr)):
result[i] = arr[i] + arr2[i]
df3 = pl.DataFrame({"values_1": [1, 2, 3], "values_2": [10, 20, 30]})
out = df3.select(
pl.struct(["values_1", "values_2"])
.map_batches(
lambda combined: add(
combined.struct.field("values_1"), combined.struct.field("values_2")
),
return_dtype=pl.Float64,
)
.alias("add_columns")
)
print(out)

流式计算#

可以使用 map_batchesis_elementwise=True 参数将结果流式传输到函数中

设置流式计算,需要确保是针对每个值进行计算,更节省内存

返回数据类型#

返回数据类型是自动推断的,第一个非空值类型,作为结果类型

python 与 polars 数据类型映射:

  • int -> Int64
  • float -> Float64
  • bool -> Boolean
  • str -> String
  • list[tp] -> List[tp]
  • dict[str, [tp]] -> struct
  • any -> object 尽量禁止这种情况

可以将 return_dtype 参数传递给 map_batches

历史相关文章#


以上是自己实践中遇到的一些问题,分享出来供大家参考学习,欢迎关注微信公众号:DataShare ,不定期分享干货

微信公众号 QQ群