初学Scala(1): Scala实现Hash Join

最近看了些Scala相关的内容,写了个简单的hash join。

初步实现

jion过程

  1. 从数据源读取两个List[List[Any]](),我把所有的操作都放到List容器里进行
  2. 将两份数据集,hash到自己写的简单的SimpleHashTable里,每次put进去的时候会返回一个Int值,用于记录两份数据占据的bucket number集合
  3. 由于两份数据都是基于同一个hash方法进行hash的,join具体发生在两个hashTable对应的bucket之间
  4. 遍历需要进行join的buckets,每对bucket之间的join回归到最基本的二层遍历

几点说明

  1. 整个过程一共两个文件,SimpleHashTable.scala和HashJoin.scala
  2. 输入是两个二维的List,输出是一个二维List,支持的是单个键的inner join
  3. 测试速度:两个10000大小的20个字段的宽表进行hash join,大约0.4s
  4. HashTable的M值可以针对数据集大小自己定制,尽量让数据集在buckets里打散

可以改进的点有很多,这个hash join还是相当简单的,我比较依赖于foldLeft和map方法,体会到Scala编程代码量很少,用起来很舒服,很强大。

class SimpleHashTable {

  val M = 991

  val container = new Array[List[Any]](M)

  for (i <- 0 to M-1) {
    container(i) = List[Any]()
  }

  def hash(key: String): Int = (key.hashCode() & 0x7fffffff) % M

  def put(key: String, value: List[Any]): Int = { // return the hash number
    val indice = hash(key)
    container(indice) = value :: container(indice)
    indice
  }

  def get(indice: Int): List[Any] = container(indice) 

  def get(key: String): List[Any] = get(hash(key))

  def dataset() = container

}
class HashJoin(list1: List[List[Any]], list2: List[List[Any]]) {

  val _list1 = list1
  val _list2 = list2

  def innerHashJion(col: Int): List[Any] = {
    val start = System.currentTimeMillis()
    var keys1 = Set[Int]()
    var keys2 = Set[Int]()

    val sht1 = _list1.par.foldLeft(new SimpleHashTable) { (sht, list) =>
      val i = sht.put(list(col).toString, list)
      keys1 = keys1 + i
      sht
    }

    val sht2 = _list2.par.foldLeft(new SimpleHashTable) { (sht, list) =>
      val i = sht.put(list(col).toString, list)
      keys2 = keys2 + i
      sht
    }
    val end = System.currentTimeMillis()
    println("Hash took: " + (end-start) + "ms")
    getJointRecords((keys1&keys2).toArray, sht1, sht2, col)
  }

  def getJointRecords(inds: Array[Int], sht1: SimpleHashTable, sht2: SimpleHashTable, col: Int): List[Any] = {
    println("joint-keys: " + inds.size)
    var ret = scala.collection.immutable.List[Any]()
    inds.par.foreach(ind => {
      println(Thread.currentThread)
      sht1.get(ind).map(record1 => {
        sht2.get(ind).map(record2 => {
          val r1 = record1.asInstanceOf[List[Any]]
          val r2 = record2.asInstanceOf[List[Any]]
          if (r1(col) == r2(col)) ret = (r1 ::: r2) :: ret
        })
      })
    })
    ret
  }

}

测试可以使用下面单例:

object HashJoinTest {
  def main(args: Array[String]): Unit = {
    test()
  }

  def test(): Unit = {
    val c1 = List(111, "asfd", 23)
    val c11 = List(111, "asf", 231)
    val c2 = List(333, "e",    1)
    val c3 = List(222, "ewr",  80)

    val t1 = List(111, "e",    40)
    val t11 = List(111, "fge", 30)
    val t2 = List(333, "asfd", 80)
    val t3 = List(444, "e",    1)

    val list1 = List(c1, c11, c2, c3)
    val list2 = List(t1, t11, t2, t3)

    val hj = new HashJoin(list1, list2)
    val ret = hj.innerHashJion(2)
    for (i <- (0 to 1)) println(ret(i))
  }
}

优化

上面的这种实现,在join结果集并发往同一个List()容器里写的时候会出现性能瓶颈,写的速度会达到10W-100W行/s,而且需要在写的时候加上synchronized实现同步。虽然scala.collection.immutable.List类是不可变的,也是线程安全的,但是在1W join 1W的测试中,0.4s内写入10W行出现了数据丢失,加上synchronized字段可以简单避免这个问题,但同时带来了额外开销。

下面新的HashJoin.scala类,为每个需要join的bucket申请了一个数组空间,让每个线程返回的单个bucket join结果集保存在统一的数组中,最后对结果集进行merge,同时保留了并发求join的特性。

优化HashJoin.scala类之后,测试速度 1W join 1W 只要 0.1s,2W join 2W 时间是 0.2s-0.4s,(M=991的情况下,M可以调整)

class HashJoin(list1: List[List[Any]], list2: List[List[Any]]) {

  val _list1 = list1
  val _list2 = list2
  val M = 991
  val retContainer = new Array[List[Any]](M)
  for (i <- 0 to M-1) retContainer(i) = List[Any]()

  var ret = List[Any]()

  def innerHashJion(col: Int): Unit = {
    val start = System.currentTimeMillis()

    var keys1 = Set[Int]()
    var keys2 = Set[Int]()

    val sht1 = _list1.par.foldLeft(new SimpleHashTable) { (sht, list) =>
      val i = sht.put(list(col).toString, list)
      keys1 = keys1 + i
      sht
    }

    val sht2 = _list2.par.foldLeft(new SimpleHashTable) { (sht, list) =>
      val i = sht.put(list(col).toString, list)
      keys2 = keys2 + i
      sht
    }

    val end = System.currentTimeMillis()
    println("Hash took: " + (end-start) + "ms")

    val jointKeys = (keys1&keys2).toArray
    println("JointKeys Size: " + jointKeys.size)
    jointKeys.par.foreach(ind => retContainer(ind) = getBucketRecords(ind, sht1, sht2, col))

    def getBucketRecords(ind: Int, sht1: SimpleHashTable, sht2: SimpleHashTable, col: Int): List[Any] = {
      var bucketRet = List[Any]()
      sht1.get(ind).map(record1 => {
        sht2.get(ind).map(record2 => {
          val r1 = record1.asInstanceOf[List[Any]]
          val r2 = record2.asInstanceOf[List[Any]]
          if (r1(col) == r2(col)) bucketRet = (r1 ::: r2) :: bucketRet
        })
      })
      bucketRet
    }
  }

  def getRet: List[Any] = {
    mergeRets
    ret
  }

  def mergeRets = {
    val t1 = System.currentTimeMillis()
    retContainer.foreach({r =>
      ret = r ::: ret
    })
    val t2 = System.currentTimeMillis()
    println("Merge Rets took: " + (t2-t1) + " ms")
  }
}

我的测试单例如下,数据来自mongodb,进行了一次BSON to List的转换,可以替换掉传入的list1和list2,传入自己想要的测试数据:

object HashJoinTest {
  def main(args: Array[String]): Unit = {
    mongo()
  }

  def mongo(): Unit = {
    val loadS = System.currentTimeMillis()
    val list1 = BsonToList.getMongoList(0, 10000)
    val list2 = BsonToList.getMongoList(100000, 10000)
    val loadE = System.currentTimeMillis()
    println("Load Data took: " + (loadE-loadS) + "ms")

    val hj = new HashJoin(list1, list2)
    hj.innerHashJion(8)
    val ret = hj.getRet
    val joinE = System.currentTimeMillis()
    println("HashJoin Totally took: " + (joinE-loadE) + "ms")

    println("Result size: " + ret.size)
    for (i <- (0 to 1)) println(ret(i))
  }
}

后续如果有优化结果,还会更新在这里。

(全文完)

时间: 2024-10-21 20:10:13

初学Scala(1): Scala实现Hash Join的相关文章

写给Java老司机的Scala教程——Scala Fast Track

引子 如果说有什么编程语言让我觉得收获颇大的话,我想除了 Java 那么另一个就是 Scala,Java 教会了我工程和严谨,而 Scala 则进一步的给了我耳目一新的思维模式,并提高了我对OOP的认识,反过来,Scala的习得,也让我成为了一个更好的Java程序员. 背景 我写这个系列教程,除了分享我自己学习Scala的一些心得体会之外,并不是要特别的安利大家Scala.而我其实比较愚钝,所以我学习Scala 的时候走了不少弯路,记得当时还是 Scala 2.10,然后各种学习资料不是特别完善

SQL优化中查询条件内移及减少HASH JOIN的代价

以下语句完全正确,但COST较高,原因是因为AEH.AEC.AC.ACSN几张表都有数百万条记录, 由于采用了HASH连接,尝试采用优化索引等多种方式,但是由于记录过多,表的查询条件相对较少, COST下降幅度始终很有限,COST提高到1000出头已经很尽力了. Select Count(Tmp.Id) From (Select Rownum As Id, t.Entry_Id, t.Container_Id, Con.Container_Num From AEC t Inner Join AE

Oracle如何查询访问同一表的两个以上索引(三)INDEX HASH JOIN执行计划

经常看到有人提出这样的疑问,我在同一张表上建立了多个索引,为什么Oracle每次都选择一个,而不能同时利用多个索引呢.一般来说,常见的访问同一张表的两个以上索引,存在三种情况,AND-EQUAL.INDEX HASH JOIN和BITMAP INDEX AND/OR. 此外,还有一个设计上的疑问,如果有A.B.C三个字段,都可能作为查询条件,是建立多个复合索引好,还是建立三个单列的索引.这个问题之所以不好回答是因为和业务或者说和查询的模式有很大的关系,不过如果理解了Oracle什么时候会选择一个

ORACLE的HASH JOIN连接

最近,查阅了部分关于HASH JOIN的资料,现整理总结如下,以备忘. HASH JOIN是oracle在7.3版本中引入的一种表连接方式,以补充NESTED LOOP 和sort merge.HASH JOIN具有以下特征: 1.只可以运行在CBO模式下 2.由于采用了hash函数的计算方式,因此只适用于等值操作 3.对hash_area_size的大小非常敏感,过大或者过小都会影响到执行效率,因此.建议采用ORACLE的自动内存管理机制: 4.hash join属于CPU密集型操作(用于ha

nested loop,merge join,hash join与子查询优化

-- 今天见到一条sql,大致意思为:A 表 left join B 表,要查出A表所有的数据,以及统计所有A表与B表相关行数 create table t1 (id int , name varchar(50),password varchar(50)); insert into t1 select id,concat(id,'rudy'),concat('password',id) from generate_series(1,100000) id; alter table t1 add p

SQL优化器原理 - Auto Hash Join

这是MaxCompute有关SQL优化器原理的系列文章之一.我们会陆续推出SQL优化器有关优化规则和框架的其他文章.添加钉钉群"关系代数优化技术"(群号11719083)可以获取最新文章发布动态(二维码在文章末尾). 本文主要描述MaxCompute优化器实现的Auto Hash Join的功能. 简介 在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 m

PostgreSQL 10.0 preview 多核并行增强 - 并行hash join支持shared hashdata, 节约哈希表内存提高效率

标签 PostgreSQL , 10.0 , 多核并行增强 , shared hash表 , hash join 背景 PostgreSQL 9.6支持哈希JOIN并行,但是每个worker进程都需要复制一份哈希表,所以会造成内存的浪费,小表无妨,但是大表的浪费是非常大的. 因此10.0做了一个改进,使用共享的哈希表. Hi hackers, In PostgreSQL 9.6, hash joins can be parallelised under certain conditions, b

对Hash Join的一次优化

转自http://www.itpub.net/thread-955209-1-1.html 对Hash Join的一次优化 前两天解决了一个优化SQL的case,SQL语句如下,big_table为150G大小,small_table很小,9000多条记录,不到1M大小hash_area_size, sort_area_size均设置足够大,可以进行optimal hash join和memory sort select /*+ leading(b) use_hash(a b) */ disti

一次HASH JOIN 临时表空间不足的分析和优化思路

(原创转载请注明出处)   最近遇到一个语句,  只要一执行这个语句就会出现报错临时表空间不足,回想一下在语句中用到临时表空间无非是大量的SORT和HASH,然后通过执行计划查看如下:  PLAN_TABLE_OUTPUT-------------------------------------------------------------------------------------------------------------------------------------------