文章
问答
冒泡
PySpark使用udf自定义运算

1、UDF介绍

UDF(User Define Function),即用户自定义函数,Spark的官方文档中没有对UDF做过多介绍,猜想可能是认为比较简单吧。

几乎所有sql数据库的实现都为用户提供了扩展接口来增强sql语句的处理能力,这些扩展称之为UDXXX,即用户定义(User Define)的XXX,这个XXX可以是对单行操作的UDF,或者是对多行操作的UDAF,或者是UDTF,本次主要介绍UDF。

UDF的UD表示用户定义,既然有用户定义,就会有系统内建(built-in),一些系统内建的函数比如abs,接受一个数字返回它的绝对值,比如substr对字符串进行截取,它们的特点就是在执行sql语句的时候对每行记录调用一次,每调用一次传入一些参数,这些参数通常是表的某一列或者某几列在当前行的值,然后产生一个输出作为结果。

UDF用于扩展框架的函数,并在多个DataFrame上重用这些函数。UDF实际上是用户可以扩展PySpark的功能以满足特定需求的一种方式。

2、使用UDF

2.1 注册spark session

from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
conf.set("spark.rpc.askTimeout", "600s")
conf.set("spark.sql.broadcastTimeout", "3600"),
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.ui.reverseProxy", "true")
conf.set("spark.ui.reverseProxyUrl", "http://spark.k8s.cluster")
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.dir", "/data")
# driver相关的配置在环境变量中设置 本地运行直接设置为localhost,端口为默认
if os.environ.get("SPARK_DRIVER_HOST"):
    conf.set("spark.driver.host", os.environ.get("SPARK_DRIVER_HOST"))
if os.environ.get("SPARK_DRIVER_PORT"):
    conf.set("spark.driver.port", os.environ.get("SPARK_DRIVER_PORT"))
if os.environ.get("SPARK_EXECUTOR_MEMORY"):
    conf.set("spark.executor.memory", os.environ.get("SPARK_EXECUTOR_MEMORY"))
spark = SparkSession.builder.appName("app_name").config(conf=config).getOrCreate()

2.2创建DataFrame

df = spark.createDataFrame(
    [
      ("张三", 85),
      ("李四", 90),
      ("王老五", 55)
    ],["name","score"]
)
studentDF.printSchema()
studentDF.show()
# 输出
root
 |-- name: string (nullable = true)
 |-- score: integer (nullable = false)

+------+-----+
|  name|score|
+------+-----+
|   张三|    85|
|   李四|    90|
| 王老五|    55|
+------+-----+

2.3使用udf

(1) 第一步是用Python语法创建一个函数并进行测试。

(2) 第二步是通过将函数名传递给PySpark SQL的udf()函数来注册它。

(3) 第三步是在DataFrame代码或发出SQL查询时使用UDF。在SQL查询中使用UDF时,注册过程略有不同。

# 引入functions
from pyspark.sql import functions
from pyspark.sql.types import StringType
# 创建一个普通的Python函数
def convertGrade(score):
    if score > 100:
        return "作弊"
    elif score >= 90:
        return "优秀"
    elif score >= 80:
        return "良好"
    elif score >= 70:
        return "中等"
    else:
        return "不及格"
grade_udf = functions.udf(avg_growth_rate, StringType())
df = df.withColumn("grade", grade_udf("score"))
df.show()
# 输出
+------+-----+------+
|  name|score| grade|
+------+-----+------+
|   张三|    85|  良好|
|   李四|    90|  优秀|
| 王老五|    55|不及格|
+------+-----+------+
spark

关于作者

小乙哥
学海无涯,回头是岸
获得点赞
文章被阅读