利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库

本文试图实现的需求场景为:以学习Spark知识点为目的,编写Scala利用Spark解析800M的tomcat日志文件,打印一段时间内ERROR级别记录的前10行,统计每分钟的日志记录数,并将统计结果存入mysql数据库中。之前曾用JAVA写过一次同样的处理逻辑,但在学习了Scala之后,真的感觉在计算方面Scala要比JAVA方便的多。没有学习Scala语言的同学速度速度了啊……

技术要点

  • 将日志文件写入HDFS中,相对路径PATH为“nova.log”
  • 注意JAVA堆栈异常日志的处理
  • 将解析后的异常日志全部存到SparkSQL中或Hive数据仓库中
  • 通过编写SQL查询一段时间内ERROR级别记录的前10行
  • 统计每分钟的日志记录数,并将统计结果存入mysql数据库中,便于上层应用直接使用计算结果

解析前后对比

解析前:

解析后:

解析代码

LoggerApp.scala:

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.TimestampType

/**
 * 日志解析
 */
object LoggerApp {
  def main(args: Array[String]): Unit = {
    println("<!--开始解析-->")
    val reg = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}) (\\[.*\\]) (.*) (.*) - ([\\s\\S]*)$"
    val path = "nova.log"
    val sc = new SparkContext(new SparkConf().setAppName("日志解析"))
    val textRDD = sc.textFile(path)

    /**
     * 处理一条日志包括多行的情况
     */
    var key = ""
    val formatRDD = textRDD.map { x =>
      if (x.matches(reg)) {
        key = x
        Pair.apply(key, "")
      } else {
        Pair.apply(key, x)
      }
    }.reduceByKey((a, b) => { a + "\n" + b }).map(x => x._1 + x._2)

    /**
     * 将字符串转换为Logger
     */
    val loggerRDD: RDD[Logger] = formatRDD.map { x =>
      {
        val reg.r(time, thread, level, logger, msg) = x //通过正则取值
        val log = new Logger(formatDate(time), thread, level, logger, msg)
        log
      }
    }.cache()

    /**
     * TODO 通过类的反射机制来定义数据库Scheme,但在scala语言中不知道为啥就是不成功,此处浪费了许久留着以后研究吧
     */
    /*val sqlc = new SQLContext(sc)
    sqlc.createDataFrame(loggerRDD, classOf[Logger]).registerTempTable("logger")*/

    /**
     * 定义数据库Scheme
     */
    val schemaString = "time thread level logger msg"
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName =>
          if ("time".equals(fieldName))
            StructField(fieldName, TimestampType, true)
          else
            StructField(fieldName, StringType, true)))
    /**
     * 将Logger转换为Row
     */
    val rowRDD = loggerRDD.map { log =>
      Row(
        formatDate(log.time),
        log.thread,
        log.level,
        log.logger,
        log.msg)
    }
    /**
     * 利用SQL进行查询过滤
     */
    //    val sqlc = bySQLContext(sc, rowRDD, schema);
    val sqlc = byHiveContext(sc, rowRDD, schema);
    val df = sqlc.sql("select * from logger where level='ERROR' and time between '2016-03-21 11:00:00' and '2016-03-21 12:00:00' order by time")
    val errLogRDD = df.map { x =>
      new Logger(
        formatDate(x.getTimestamp(0)),
        x.getString(1),
        x.getString(2),
        x.getString(3),
        x.getString(4))
    }
    for (log <- errLogRDD.take(10)) {
      println("time:" + formatDateToStr(log.time))
      println("thread:" + log.thread)
      println("level:" + log.level)
      println("logger:" + log.logger)
      println("msg:" + log.msg)
    }
    println("<!--解析结束-->")
  }
  /**
   * 创建临时表
   */
  def bySQLContext(sc: SparkContext, rowRDD: RDD[Row], schema: StructType): SQLContext = {
    val sqlc = new SQLContext(sc)
    sqlc.createDataFrame(rowRDD, schema).registerTempTable("logger")
    sqlc
  }
  /**
   * 创建永久表,需要提前搭建好Spark与Hive的集成环境
   */
  def byHiveContext(sc: SparkContext, rowRDD: RDD[Row], schema: StructType): SQLContext = {
    val sqlc = new HiveContext(sc)
    sqlc.sql("drop table if exists logger")
    sqlc.sql("CREATE TABLE IF NOT EXISTS logger (time TIMESTAMP, thread STRING, level STRING, logger STRING, msg STRING)")
    sqlc.createDataFrame(rowRDD, schema).write.mode("overwrite").saveAsTable("logger")
    sqlc
  }
  def formatDate(str: String): Date = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(str)
  }
  def formatDate(timestamp: java.sql.Timestamp): Date = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(timestamp.toString())
  }
  def formatDate(date: Date): java.sql.Timestamp = {
    new java.sql.Timestamp(date.getTime)
  }
  def formatDateToStr(date: Date): String = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(date)
  }
}

Logger.scala:

import java.util.Date

class Logger extends Serializable {
  var time: Date = null
  var thread: String = ""
  var level: String = ""
  var logger: String = ""
  var msg: String = ""
  def this(time: Date, thread: String, level: String, logger: String, msg: String) {
    this()
    this.time = time;
    this.thread = thread;
    this.level = level;
    this.logger = logger;
    this.msg = msg;
  }
}

统计并写入Mysql

LoggerMysqlApp.scala:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
import java.util.Date
import java.text.SimpleDateFormat
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
import java.util.Properties

object LoggerMysqlApp {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("输出写入Mysql"))
    /**
     * 从hive中加载数据
     */
    val hivec = new HiveContext(sc)
    val df = hivec.sql("select * from logger")
    val loggerRDD = df.rdd.map { x =>
      new Logger(
        LoggerApp.formatDate(x.getTimestamp(0)),
        x.getString(1),
        x.getString(2),
        x.getString(3),
        x.getString(4))
    }
    val resultRDD = loggerRDD.map { logger =>
      Pair(formatDateToStr(logger.time), 1)
    }.reduceByKey((a, b) =>
      { a + b }).map(f =>
      Row(f._1, f._2)).sortBy(f => f.getInt(1), false, 2)
    for (r <- resultRDD.take(10)) {
      println(r.getString(0) + ":" + r.getInt(1))
    }
    /**
     * 定义数据库Scheme
     */
    val schemaString = "time count"
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName =>
          if ("time".equals(fieldName))
            StructField(fieldName, StringType, true)
          else
            StructField(fieldName, IntegerType, true)))
    /**
     * TODO计算每分钟日志的个数
     */
    val connectionProperties = new Properties()
    connectionProperties.setProperty("user", "root")
    connectionProperties.setProperty("password", ".")
    new SQLContext(sc).createDataFrame(resultRDD, schema).write.jdbc(
      "jdbc:mysql://192.168.136.128:3306/logger",
      "logger",
      connectionProperties);
  }
  def formatDateToStr(date: Date): String = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm").format(date)
  }
}
时间: 2025-01-20 14:07:04

利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库的相关文章

tomcat-接收到的tcp数据怎么存入mysql数据库,并显示在Tomcat服务器上

问题描述 接收到的tcp数据怎么存入mysql数据库,并显示在Tomcat服务器上 怎么在TOMCAT服务器中编程接收tcp数据(通过GPRS模块发送的),并将接收的数据存入Mysql数据库中,最终显示在TOMCAT服务器的网站中

利用Advanced Installer将asp.netMVC连同IIS服务和mysql数据库一块打包成exe安装包

原文:利用Advanced Installer将asp.netMVC连同IIS服务和mysql数据库一块打包成exe安装包 因为业务需要,项目中需要把asp.netmvc项目打包成exe安装程序给客户,让客户直接可以点下一步下一步安装部署web程序,并且同时要将IIS服务和mysql一同安装到服务器上,因为客户的电脑可能是64位也可能是32位,所以在打包的时候就需要打包成两份安装包.研究了几天终于有所收获,下边就是打包的步骤. 打包步骤: 一.前期准备 1.将asp.netmvc发布到本地目录中

php利用ExcelParser 导入excel存入mysql 数据库

php教程 导入excel存入mysql教程 数据库教程, 利用ExcelParser class ExcelParser {     private $_data=array(0,'');     private $_excel_handle;     private $_excel=array();     /**      * 构造函数      * @param <string> $filename 上传文件临时文件名称      */     public function __co

天气预报之抓取、解析、存入MYSQL数据库模块实现,不规范(代码)

package com.zzk.cn; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.MalformedURLE

解析远程连接管理其他机器上的MYSQL数据库_Mysql

在开发过程中,有时候需要远程连接并管理别的机器上的MYSQL数据库,在实现的过程中会遇到一系列的问题,现在以远程访问我自己安装在Ubuntu上的MYSQL数据为例(端口为默认端口3306),说明一下配置步骤及每一步中遇到的问题及相应解决方法:远程连接管理MYSQL,总体上来说有三步:(A为主操作机器,B为远程机器(MYSQL安装在B上,由A访问B)1,在被连接的MYSQL中创建专门的远程连接用户wow: 2,修改被连接的MYSQL的配置文件my.cnf,使此MYSQL不仅仅支持本地IP127.0

php 导入excel存入mysql 数据库, 利用ExcelParser

class ExcelParser {     private $_data=array(0,'');     private $_excel_handle;     private $_excel=array();     /**      * 构造函数      * @param <string> $filename 上传文件临时文件名称      */     public function __construct($filename)     {         /**        

Python 分析Nginx访问日志并保存到MySQL数据库实例_python

使用Python 分析Nginx access 日志,根据Nginx日志格式进行分割并存入MySQL数据库.一.Nginx access日志格式如下: 复制代码 代码如下: $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_f

解析Tomcat的启动脚本--startup.bat_Tomcat

概述 我们通常使用 Tomcat 中的 startup.bat 来启动 Tomcat. 但是这其中干了一些什么事呢? 大家都知道一个 Java 程序需要启动的话, 肯定需要 main 方法, 那么这个 main 方法在哪呢? Tomcat 脚本中又是配置了一些什么参数呢, 什么情况下 Tomcat 会启动失败呢? 带着一些列的疑问我们来分析 Tomcat 的三个最重要的启动脚本: startup.bat catalina.bat setclasspath.bat startup.bat 脚本 该

解析Tomcat的启动脚本--catalina.bat_Tomcat

概述 Tomcat 的三个最重要的启动脚本: startup.bat catalina.bat setclasspath.bat 上一篇咱们分析了 startup.bat 脚本 这一篇咱们来分析 catalina.bat 脚本. 至于 setclasspath.bat 这个脚本, 相信看完这一篇, 就可以自己看懂这个脚本了. 可以点击下载 [ setclasspath.bat 脚本 ]查看附注释的 setclasspath.bat 脚本 catalina.bat 这个脚本的代码有点多, 就单独弄