1、UDF介绍
UDF(User Define Function),即用户自定义函数,Spark的官方文档中没有对UDF做过多介绍,猜想可能是认为比较简单吧。
几乎所有sql数据库的实现都为用户提供了扩展接口来增强sql语句的处理能力,这些扩展称之为UDXXX,即用户定义(User Define)的XXX,这个XXX可以是对单行操作的UDF,或者是对多行操作的UDAF,或者是UDTF,本次主要介绍UDF。
UDF的UD表示用户定义,既然有用户定义,就会有系统内建(built-in),一些系统内建的函数比如abs,接受一个数字返回它的绝对值,比如substr对字符串进行截取,它们的特点就是在执行sql语句的时候对每行记录调用一次,每调用一次传入一些参数,这些参数通常是表的某一列或者某几列在当前行的值,然后产生一个输出作为结果。
UDF用于扩展框架的函数,并在多个DataFrame上重用这些函数。UDF实际上是用户可以扩展PySpark的功能以满足特定需求的一种方式。
2、使用UDF
2.1 注册spark session
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
conf.set("spark.rpc.askTimeout", "600s")
conf.set("spark.sql.broadcastTimeout", "3600"),
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.ui.reverseProxy", "true")
conf.set("spark.ui.reverseProxyUrl", "http://spark.k8s.cluster")
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.dir", "/data")
# driver相关的配置在环境变量中设置 本地运行直接设置为localhost,端口为默认
if os.environ.get("SPARK_DRIVER_HOST"):
conf.set("spark.driver.host", os.environ.get("SPARK_DRIVER_HOST"))
if os.environ.get("SPARK_DRIVER_PORT"):
conf.set("spark.driver.port", os.environ.get("SPARK_DRIVER_PORT"))
if os.environ.get("SPARK_EXECUTOR_MEMORY"):
conf.set("spark.executor.memory", os.environ.get("SPARK_EXECUTOR_MEMORY"))
spark = SparkSession.builder.appName("app_name").config(conf=config).getOrCreate()
2.2创建DataFrame
df = spark.createDataFrame(
[
("张三", 85),
("李四", 90),
("王老五", 55)
],["name","score"]
)
studentDF.printSchema()
studentDF.show()
# 输出
root
|-- name: string (nullable = true)
|-- score: integer (nullable = false)
+------+-----+
| name|score|
+------+-----+
| 张三| 85|
| 李四| 90|
| 王老五| 55|
+------+-----+
2.3使用udf
(1) 第一步是用Python语法创建一个函数并进行测试。
(2) 第二步是通过将函数名传递给PySpark SQL的udf()函数来注册它。
(3) 第三步是在DataFrame代码或发出SQL查询时使用UDF。在SQL查询中使用UDF时,注册过程略有不同。
# 引入functions
from pyspark.sql import functions
from pyspark.sql.types import StringType
# 创建一个普通的Python函数
def convertGrade(score):
if score > 100:
return "作弊"
elif score >= 90:
return "优秀"
elif score >= 80:
return "良好"
elif score >= 70:
return "中等"
else:
return "不及格"
grade_udf = functions.udf(avg_growth_rate, StringType())
df = df.withColumn("grade", grade_udf("score"))
df.show()
# 输出
+------+-----+------+
| name|score| grade|
+------+-----+------+
| 张三| 85| 良好|
| 李四| 90| 优秀|
| 王老五| 55|不及格|
+------+-----+------+