Spark UDF变长参数的二三事儿

在复杂业务逻辑中,我们经常会用到Spark的UDF,当一个UDF需要传入多列的内容并进行处理时,UDF的传参该怎么做呢? 下面通过变长参数引出,逐一介绍三种可行方法以及一些不可行的尝试...

引子

变长参数对于我们来说并不陌生,在Java里我们这么写


  1. public void varArgs(String... args) 

在Scala里我们这么写


  1. def varArgs(cols: String*): String 

而在Spark里,很多时候我们有自己的业务逻辑,现成的functions满足不了我们的需求,而当我们需要处理同一行的多个列,将其经过我们自己的逻辑合并为一个列时,变长参数及其变种实现可以给我们提供帮助。

但是在Spark UDF里我们是 无法使用变长参数传值 的,但之所以本文以变长参数开头,是因为需求起于它,而通过对它进行变换,我们可以使用变长参数或Seq类型来接收参数。

下面通过Spark-Shell来做演示,以下三种方法都可以做到多列传参,分别是

  • 变长参数(接受array类型)
  • Seq类型参数(接受array类型)
  • Row类型参数(接受struct类型)

变长参数类型的UDF

定义UDF方法


  1. def myConcatVarargs(sep: String, cols: String*): String = cols.filter(_ != null).mkString(sep) 

注册UDF函数

由于变长参数只能通过方法定义,所以这里使用部分应用函数来转换


  1. val myConcatVarargsUDF = udf(myConcatVarargs _) 

可以看到该UDF的定义如下


  1. UserDefinedFunction(<function2>,StringType,List(StringType, ArrayType(StringType,true))) 

也即变长参数转换为了ArrayType,而且函数是只包括两个参数,所以变长参数列表由此也可看出无法使用的。

变长参数列表传值

我们构造一个DataFrame如下


  1. val df = sc.parallelize(Array(("aa", "bb", "cc"),("dd","ee","ff"))).toDF("A", "B", "C") 

然后直接传入多个String类型的列到myConcatVarargsUDF


  1. df.select(myConcatVarargsUDF(lit("-"), col("A"), col("B"), col("C"))).show 

结果出现如下报错


  1. java.lang.ClassCastException: anonfun$1 cannot be cast to scala.Function4 

由此可以看出,使用变长参数列表的方式Spark是不支持的,它会被识别为四个参数的函数,而UDF确是被定义为两个参数而不是四个参数的函数!

变换:使用array()转换做第二个参数

我们使用Spark提供的array() function来转换参数为Array类型


  1. df.select(myConcatVarargsUDF(lit("-"), array(col("A"), col("B"), col("C")))).show 

结果如下


  1. +-------------------+ 
  2. |UDF(-,array(A,B,C))| 
  3. +-------------------+ 
  4. |           aa-bb-cc| 
  5. |           dd-ee-ff| 
  6. +-------------------+ 

由此可以看出,使用变长参数构造的UDF方法,可以通过构造Array的方式传参,来达到多列合并的目的。

使用Seq类型参数的UDF

上面提到,变长参数最后被转为ArrayType,那不禁要想我们为嘛不使用Array或List类型呢?

实际上在UDF里,类型并不是我们可以随意定义的,比如使用List和Array就是不行的,我们自己定义的类型也是不行的,因为这涉及到数据的序列化和反序列化。

以Array/List为示例的错误

下面以Array类型为示例

定义函数


  1. val myConcatArray = (cols: Array[String], sep: String) => cols.filter(_ != null).mkString(sep) 

注册UDF


  1. val myConcatArrayUDF = udf(myConcatArray) 

可以看到给出的UDF签名是


  1. UserDefinedFunction(<function2>,StringType,List()) 

应用UDF


  1. df.select(myConcatArrayUDF(array(col("A"), col("B"), col("C")), lit("-"))).show 

会发现报错


  1. scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String 

同样List作为参数类型也会报错,因为反序列化的时候无法构建对象,所以List和Array是无法直接作为UDF的参数类型的

以Seq做参数类型

定义调用如下


  1. val myConcatSeq = (cols: Seq[Any], sep: String) => cols.filter(_ != null).mkString(sep)  
  2. val myConcatSeqUDF = udf(myConcatSeq)  
  3. df.select(myConcatSeqUDF(array(col("A"), col("B"), col("C")), lit("-"))).show 

结果如下


  1. +-------------------+ 
  2. |UDF(array(A,B,C),-)| 
  3. +-------------------+ 
  4. |           aa-bb-cc| 
  5. |           dd-ee-ff| 
  6. +-------------------+ 

使用Row类型参数的UDF

我们可以使用Spark functions里struct方法构造结构体类型传参,然后用Row类型接UDF的参数,以达到多列传值的目的。


  1. def myConcatRow: ((Row, String) => String) = (row, sep) => row.toSeq.filter(_ != null).mkString(sep)  
  2. val myConcatRowUDF = udf(myConcatRow)  
  3. df.select(myConcatRowUDF(struct(col("A"), col("B"), col("C")), lit("-"))).show 

可以看到UDF的签名如下


  1. UserDefinedFunction(<function2>,StringType,List()) 

结果如下


  1. +--------------------+ 
  2. |UDF(struct(A,B,C),-)| 
  3. +--------------------+ 
  4. |            aa-bb-cc| 
  5. |            dd-ee-ff| 
  6. +--------------------+ 

使用Row类型还可以使用模式提取,用起来会更方便


  1. row match { 
  2.   case Row(aa:String, bb:Int) => 

最后

对于上面三种方法,变长参数和Seq类型参数都需要array的函数包装为ArrayType,而使用Row类型的话,则需要struct函数构建结构体类型,其实都是为了数据的序列化和反序列化。三种方法中,Row的方式更灵活可靠,而且支持不同类型并且可以明确使用模式提取,用起来相当方便。

而由此我们也可以看出,UDF不支持List和Array类型的参数,同时 自定义参数类型 如果没有混合Spark的特质实现序列化和反序列化,那么在UDF里也是 无法用作参数类型 的。当然,Seq类型是可以 的,可以接多列的数组传值。

此外,我们也可以使用柯里化来达到多列传参的目的,只是不同参数个数需要定义不同的UDF了。  

本文作者:佚名

来源:51CTO

时间: 2024-09-08 01:24:48

Spark UDF变长参数的二三事儿的相关文章

J2SE5.0 实例---变长参数

j2se 变长参数(Varargs)与泛型一样,变长参数是C++中有而Java中没有的一种语言特性,在过去如果我们想向一个函数传递可变数量的函数,就必须首先将这些参数放入一个数组中,然后将数组传递给函数.就如同下面所作的一样: Object[] arguments = { 640, "kb", "anybody", "Bill Gates" }; String result = MessageFormat.format(      "{

Lua学习笔记之函数、变长参数、closure(闭包)、select等_Lua

1. Lua函数支持多返回值,但并不是每次调用函数返回的全部值都会被使用. 有一条规则是只有当函数调用是表达式最后一个元素时,才会使用它的全部返回值.看代码: 复制代码 代码如下: --string.find函数返回两个值,:被查找子串的开始索引和结束索引  s,e = string.find("Lua program language","Lua")  print(s,e)  --> 1    3    --如果找不到,则输出nil和nil  s,e = s

java-基础-变长参数

在Java5 中提供了变长参数(varargs),也就是在方法定义中可以使用个数不确定的参数,对于同一方法可以使用不同个数的参数调用. 可变长参数的定义 print(String... args){ ... } print(); print("hello"); print("hello","lisi"); print("hello","张三", "alexia") 在调用方法的时候,如果

C++中的变长参数深入理解_C 语言

前言 在吸进的一个项目中为了使用共享内存和自定义内存池,我们自己定义了MemNew函数,且在函数内部对于非pod类型自动执行构造函数.在需要的地方调用自定义的MemNew函数.这样就带来一个问题,使用stl的类都有默认构造函数,以及复制构造函数等.但使用共享内存和内存池的类可能没有默认构造函数,而是定义了多个参数的构造函数,于是如何将参数传入MemNew函数便成了问题. 一.变长参数函数 首先回顾一下较多使用的变长参数函数,最经典的便是printf. extern int printf(cons

反射,变长参数的构造函数

问题描述 import java.lang.reflect.Constructor;import java.lang.reflect.InvocationTargetException;public class Hello {private Integer h;public Hello(int... _t) {int _h = 0;for (int i : _t) {_h += i;}h = new Integer(_h);}public void say() {System.out.print

C/C++变长参数宏(Variadic Macros)

A macro can be declared to accept a variable number of arguments much as a function can. The syntax for defining the macro is similar to that of a function. Here is an example: #define eprintf(...) fprintf (stderr, __VA_ARGS__) This kind of macro is

代码-关于可变长参数列表的几个问题

问题描述 关于可变长参数列表的几个问题 比如 public static void printMax(double...numbers) 如果要运用到代码中是什么意思? 还有这几个方法声明错在哪里? public static void print(String...strings, double...numbers) public static void print(double...numbers, String name) public static double...print(doub

JDK5.0新特性系列---5.可变长参数Varargs

      /**  * 在J2SE5.0之前,当传入到方法的参数个数不固定时,经常采用数组的方式传递参数  * 在J2SE5.0之后,可以使用可变长参数的我给方法传递参数  */ /**  * 在参数类型和参数名之间使用"..."(三个英文的点),表示该参数为可变长的  * 通过新的for循环读取可变长参数中的值  * 一个方法里最多只能有一个变长参数,而且这个变长参数一定要放在参数表的最后一个参数 */ import static java.lang.System.*; publi

C++模板介绍:什么是变长模板函数

最近的C++语言标准有些更进一步的复杂特性,诸如加上了变长模板.我在尝试理解这个特性的过程中 的一个最大的问题是,没有足够简单的代码示例来说明到底变长模板是如何使用和起作用的. 以下是 我的一个基本样例来说明变长模板: template <class ...A> int func(A... arg) {    return sizeof...(arg); } int main(void) {    return func (1,2,3,4,5,6); } 首要介绍的是一些术语: 一个模板参数现