作者:周志湖
微信号:zhouzhihubeyond
本节主要内容
- Correlation 相关性分析
- 分层采样(Stratified sampling)
- 随机数据生成(Random data generation)
1. Correlation 相关性分析
相关性分析用于研究两个随机变量之间的依赖关系,它是统计学当中的一种十分重要的方法,在Spark中只实现了两种相关性分析方法,分别是皮尔逊(Pearson)与斯皮尔曼(Spearman)相关性分析方法,具体可参见。皮尔逊(Pearson)相关系数(具体参见:https://en.wikipedia.org/wiki/Correlation_coefficient)定义如下:
其中,协方差有如下定义形式:
方差具有如下定义形式:
标准差具有如下定义形式:
上述公式中的方差、标准差只能用来描述一维数据,协方差的意义在于其能够描述多维数据,如果结果为正值,则说明两者是正相关的,为负值则为负相关,值为0,则表示两者不相关,从上述几个公式的定义可以推出下列公式:
协方差可以将数据扩展到二维,对于n维数据,就需要计算
个协方差,此时自然而然地将其组织为协方差矩阵,例如一个三维变量x,y,z构成的协方差矩阵具有如下形式:
从上面的图可以看到:协方差矩阵是一个对称的矩阵,而且对角线是各个维度的方差。皮尔逊(Pearson)相关系数通过协方差矩阵便可得到。PearsonCorrelation在Spark中是私有成员,不能直接访问,使用时仍然是通过Statistics对象进行
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.stat._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Matrix, Vector}
object CorrelationDemo extends App {
val sparkConf = new SparkConf().setAppName("StatisticsDemo").setMaster("spark://sparkmaster:7077")
val sc = new SparkContext(sparkConf)
val rdd1:RDD[Double] = sc.parallelize(Array(11.0, 21.0, 13.0, 14.0))
val rdd2:RDD[Double] = sc.parallelize(Array(11.0, 20.0, 13.0, 16.0))
//两个rdd间的相关性
//返回值:correlation: Double = 0.959034501397483
//[-1, 1],值越接近于1,其相关度越高
val correlation:Double = Statistics.corr(rdd1, rdd2, "pearson")
val rdd3:RDD[Vector]= sc.parallelize(
Array(
Array(1.0,2.0,3.0,4.0),
Array(2.0,3.0,4.0,5.0),
Array(3.0,4.0,5.0,6.0)
)
).map(f => Vectors.dense(f))
//correlation3: org.apache.spark.mllib.linalg.Matrix =
//1.0 1.0 1.0 1.0
//1.0 1.0 1.0 1.0
//1.0 1.0 1.0 1.0
//1.0 1.0 1.0 1.0
val correlation3:Matrix = Statistics.corr(rdd3, "pearson")
}
假设某工厂通过随机抽样得到考试成绩与产量之间的关系数据如下:
直观地看,成绩越高产量越高,如果使用pearson相关系数,将得到如下结果:
val rdd4:RDD[Double] = sc.parallelize(Array(50.0, 60.0, 70.0, 80.0,90.0,95.0))
val rdd5:RDD[Double] = sc.parallelize(Array(500.0, 510.0, 530.0, 580.0,560,1000))
//执行结果为:
//correlation4: Double = 0.6915716600436548
val correlation4:Double = Statistics.corr(rdd4, rdd5, "pearson")
但其实从我们观察的数据来看,它们应该是高度相关的,虽然0.69也一定程度地反应了数据间的相关性,但表达力仍然不够,为此可以引入Spearman相关系数(参见http://www.360doc.com/content/08/1228/23/50235_2219531.shtml),如表中的第四、第五列数据,通过将成绩和产量替换成等级,那它们之间的相关度会明显提高,这样的话表达能力更强,如下列代码所示:
//采用spearman相关系数
//执行结果:
//correlation5: Double = 0.9428571428571412
val correlation5:Double = Statistics.corr(rdd4, rdd5, "spearman")
从上面的执行结果来看,相关性从pearson的值0.6915716600436548提高到了0.9428571428571412。由于利用的等级相关,因而spearman相关性分析也称为spearman等级相关分析或等级差数法,但需要注意的是spearman相关性分析方法涉及到等级的排序问题,在分布式环境下的排序可能会涉及到大量的网络IO操作,算法效率不是特别高。
2. 分层采样(Stratified sampling)
本小节使用spark自带的README.md文件进行相应的演示操作
package cn.ml.stat
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.SparkConf
object StratifiedSampleDemo extends App {
val sparkConf = new SparkConf().setAppName("StatisticsDemo").setMaster("spark://sparkmaster:7077")
val sc = new SparkContext(sparkConf)
//读取HDFS上的README.md文件
val textFile = sc.textFile("/README.md")
//wordCount操作,返回(K,V)汇总结果
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
//定义key为spark,采样比率为0.5
val fractions: Map[String, Double] = Map("Spark"->0.5)
//使用sampleByKey方法进行采样
val approxSample = wordCounts.sampleByKey(false, fractions)
//使用sampleByKeyExact方法进行采样,该方法资源消耗较sampleByKey更大
//但采样后的大小与预期大小更接近,可信度达到99.99%
val exactSample = wordCounts.sampleByKeyExact(false, fractions)
}
3. 随机数据生成(Random data generation)
scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> import org.apache.spark.mllib.random.RandomRDDs._
import org.apache.spark.mllib.random.RandomRDDs._
//生成100个服从标准正态分面N(0,1)的随机RDD数据,10为指定的分区数
scala> val u = normalRDD(sc, 100L, 10)
u: org.apache.spark.rdd.RDD[Double] = RandomRDD[26] at RDD at RandomRDD.scala:38
//转换使其服从N(1,4)的正太分布
scala> val v = u.map(x => 1.0 + 2.0 * x)
v: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[27] at map at <console>:27