如果在SPARK函数中使用UDF或UDAF

Spark目前已经内置的函数参见:

Spark 1.5 DataFrame API Highlights: Date/Time/String Handling, Time Intervals, and UDAFs

如果在SPARK函数中使用UDF或UDAF, 详见示例

package cn.com.systex

import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.functions.callUDF
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.StringType
import java.sql.Timestamp
import java.sql.Date
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructField

/**
 * DateTime: 2015年12月25日 上午10:41:42
 *
 */
//定义一个日期范围类
case class DateRange(startDate: Timestamp, endDate: Timestamp) {
  def in(targetDate: Date): Boolean = {
    targetDate.before(endDate) && targetDate.after(startDate)
  }
  override def toString(): String = {
    startDate.toLocaleString() + " " + endDate.toLocaleString();
  }
}

//定义UDAF函数,按年聚合后比较,需要实现UserDefinedAggregateFunction中定义的方法
class YearOnYearCompare(current: DateRange) extends UserDefinedAggregateFunction {
  val previous: DateRange = DateRange(subtractOneYear(current.startDate), subtractOneYear(current.endDate))
  println(current)
  println(previous)
  //UDAF与DataFrame列有关的输入样式,StructField的名字并没有特别要求,完全可以认为是两个内部结构的列名占位符。
  //至于UDAF具体要操作DataFrame的哪个列,取决于调用者,但前提是数据类型必须符合事先的设置,如这里的DoubleType与DateType类型
  def inputSchema: StructType = {
    StructType(StructField("metric", DoubleType) :: StructField("timeCategory", DateType) :: Nil)
  }
  //定义存储聚合运算时产生的中间数据结果的Schema
  def bufferSchema: StructType = {
    StructType(StructField("sumOfCurrent", DoubleType) :: StructField("sumOfPrevious", DoubleType) :: Nil)
  }
  //标明了UDAF函数的返回值类型
  def dataType: org.apache.spark.sql.types.DataType = DoubleType

  //用以标记针对给定的一组输入,UDAF是否总是生成相同的结果
  def deterministic: Boolean = true

  //对聚合运算中间结果的初始化
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, 0.0)
    buffer.update(1, 0.0)
  }

  //第二个参数input: Row对应的并非DataFrame的行,而是被inputSchema投影了的行。以本例而言,每一个input就应该只有两个Field的值
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (current.in(input.getAs[Date](1))) {
      buffer(0) = buffer.getAs[Double](0) + input.getAs[Double](0)
    }
    if (previous.in(input.getAs[Date](1))) {
      buffer(1) = buffer.getAs[Double](0) + input.getAs[Double](0)
    }
  }

  //负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0)
    buffer1(1) = buffer1.getAs[Double](1) + buffer2.getAs[Double](1)
  }

  //完成对聚合Buffer值的运算,得到最后的结果
  def evaluate(buffer: Row): Any = {
    if (buffer.getDouble(1) == 0.0) {
      0.0
    } else {
      (buffer.getDouble(0) - buffer.getDouble(1)) / buffer.getDouble(1) * 100
    }
  }

  private def subtractOneYear(date: Timestamp): Timestamp = {
    val prev = new Timestamp(date.getTime)
    prev.setYear(prev.getYear - 1)
    prev
  }
}
/**
 * Spark 1.5 DataFrame API Highlights: Date/Time/String Handling, Time Intervals, and UDAFs
 * https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
 */
object SimpleDemo {
  def main(args: Array[String]): Unit = {
    val dir = "D:/Program/spark/examples/src/main/resources/";
    val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("sqltest"))
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    //用$符号来包裹一个字符串表示一个Column,定义在SQLContext对象implicits中的一个隐式转换
    //DataFrame的API可以接收Column对象,UDF的定义不能直接定义为Scala函数,而是要用定义在org.apache.spark.sql.functions中的udf方法来接收一个函数。
    //这种方式无需register
    //如果需要在函数中传递一个变量,则需要org.apache.spark.sql.functions中的lit函数来帮助

    //创建DataFrame
    val df = sqlContext.createDataFrame(Seq(
      (1, "张三峰", "广东 广州 天河", 24),
      (2, "李四", "广东 广州 东山", 36),
      (3, "王五", "广东 广州 越秀", 48),
      (4, "赵六", "广东 广州 海珠", 29))).toDF("id", "name", "addr", "age")

    //定义函数
    def splitAddrFunc: String => Seq[String] = {
      _.toLowerCase.split("\\s")
    }
    val longLength = udf((str: String, length: Int) => str.length > length)
    val len = udf((str: String) => str.length)

    //使用函数
    val df2 = df.withColumn("addr-ex", callUDF(splitAddrFunc, new ArrayType(StringType, true), df("addr")))
    val df3 = df2.withColumn("name-len", len($"name")).filter(longLength($"name", lit(2)))

    println("打印DF Schema及数据处理结果")
    df.printSchema()
    df3.printSchema()
    df3.foreach { println }

    //SQL模型
    //定义普通的scala函数,然后在SQLContext中进行注册,就可以在SQL中使用了。
    def slen(str: String): Int = str.length
    def slengthLongerThan(str: String, length: Int): Boolean = str.length > length
    sqlContext.udf.register("len", slen _)
    sqlContext.udf.register("longLength", slengthLongerThan _)
    df.registerTempTable("user")

    println("打印SQL语句执行结果")
    sqlContext.sql("select name,len(name) from user where longLength(name,2)").foreach(println)

    println("打印数据过滤结果")
    df.filter("longLength(name,2)").foreach(println)

    //如果定义UDAF(User Defined Aggregate Function)
    //Spark为所有的UDAF定义了一个父类UserDefinedAggregateFunction。要继承这个类,需要实现父类的几个抽象方法
    val salesDF = sqlContext.createDataFrame(Seq(
      (1, "Widget Co", 1000.00, 0.00, "AZ", "2014-01-02"),
      (2, "Acme Widgets", 2000.00, 500.00, "CA", "2014-02-01"),
      (3, "Widgetry", 1000.00, 200.00, "CA", "2015-01-11"),
      (4, "Widgets R Us", 5000.00, 0.0, "CA", "2015-02-19"),
      (5, "Ye Olde Widgete", 4200.00, 0.0, "MA", "2015-02-18"))).toDF("id", "name", "sales", "discount", "state", "saleDate")
    salesDF.registerTempTable("sales")

    val current = DateRange(Timestamp.valueOf("2015-01-01 00:00:00"), Timestamp.valueOf("2015-12-31 00:00:00"))

    //在使用上,除了需要对UDAF进行实例化之外,与普通的UDF使用没有任何区别
    val yearOnYear = new YearOnYearCompare(current)
    sqlContext.udf.register("yearOnYear", yearOnYear)

    val dataFrame = sqlContext.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales")
    salesDF.printSchema()
    dataFrame.show()
  }
}
时间: 2024-12-24 22:18:14

如果在SPARK函数中使用UDF或UDAF的相关文章

Spark生态系统中的图数据分析知识

图结构可有效表示稀疏矩阵,因而图数据分析可用于实现大数据分析.对于Spark生态系统中的图处理系统GraphX,<Spark GraphX in Action>一书给出了详细的教程和典型用例,将教会读者如何使用GraphX和GraphFrames进行图分析.本文是Info对该书作者的访谈,内容包括图数据及分析技术.GraphX高效程序开发.图数据分析的趋势等. 如何定义图数据? Michael Malak:就事论事,图结构看上去并非像股价图那样,而是边和点的集合.但这只是一种模糊的数学抽象.更

Apache Spark源码走读(十一)浅谈mllib中线性回归的算法实现&amp;Spark MLLib中拟牛顿法L-BFGS的源码实现

<一>浅谈mllib中线性回归的算法实现 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最

for-关于 += 在被调用函数中时的问题。

问题描述 关于 += 在被调用函数中时的问题. int add(int a){int b = 0: b += 1;}int main (void){int a =0;int i = 0;for(i = 0;i<= 5;i++){printf(""%dn""add(a));}return 0;}如果通过循环调用函数的话, b += 1;怎么进行赋值? 解决方案 使用指针或引用都可以达到要求,一般教科书上会使用指针. void add(int *a){ *a =

c语言数组与函数-如何在函数中给已经在主函数中定义好的数组赋值

问题描述 如何在函数中给已经在主函数中定义好的数组赋值 已经在main()中定义了一个长度为20的数组,想在定义的函数中给数组赋值,但一直报错,请问如何修改? void arrin(int *arr) { int i; arr[]={1,1,2,2,3,3,4,5,6,5,6,7,7,8,8,9,9,0,0}; for(i=0;i<20;i++) printf("%d",arr[i]); } main() { int testarr[20]; ............ } 解决方

个人 平均相差天数-excel函数中统计问题,统计函数

问题描述 excel函数中统计问题,统计函数 表1是多名电工的维修单登记表含接单日期及交单日期,如何在表2中统计表1中某一人的工作效率(平均相差天数)和平均服务满意度 解决方案 在表一中,把该人的信息每天的情况汇集成一张表,在利用统计方法中的函数

c++问题-C++的类中怎么在一个函数中引用上一层的函数

问题描述 C++的类中怎么在一个函数中引用上一层的函数 在同一个类中怎么引用先定义的函数来定义函数?比如在一个时期类的定义中,一个日期加1的函数中怎么调用先定义的一个求是否闰年的函数. 解决方案 C++定义隐式转换函数将类转换为内部的一个成员变量c++函数默认参数是一个好的设计吗? 解决方案二: 直接在函数内调用就可以了.不知道你说的上一层是什么意思.是基类的函数还是集合类所属的对象的函数,前者直接调用,或者用 基类类名::函数名后者用构造函数传对象指针 解决方案三: 同一个类中的函数都是通过t

qmap-QT Qmap 在一个函数中定义,怎么在另一个函数中遍历

问题描述 QT Qmap 在一个函数中定义,怎么在另一个函数中遍历 50C void address_pool::set_address_pool(QString get_IP){ QString ip; ip= get_IP; qDebug()<<""IP""<<ip; QStringList str=ip.split(""); QStringList strlist= str.at(0).split("&qu

c语言中怎么把一个大小不确定的二维数组当作参数传入函数中

问题描述 c语言中怎么把一个大小不确定的二维数组当作参数传入函数中 c语言中怎么把一个大小不确定的二维数组当作参数传入函数中,取大神,取大神,取大神 解决方案 用VC++新建一个程序,默认生成的main函数定义如下 int mian(int argc, char* args[]) 这就是一个例子. 解决方案二: 一个表示长度的参数,一个指向二维数组的指针 解决方案三: fun(args[][],int rows,int cols) 解决方案四: void Func(int array[][10]

javascript-在函数中写 this.xx=function 和 var xx =function() 的区别

问题描述 在函数中写 this.xx=function 和 var xx =function() 的区别 亲问各位一个问题 JS代码如下 function Test(){ this.add = function(){ alert(1); } this.modAdd = function(){ this.add(); add(); // 以上两种均无法调用到ADD方法 // Uncaught TypeError: this.add is not a function } } var test =