Spark-再接着上次的Lamda架构

日志分析

单机日志分析,适用于小数据量的。(最大10G),awk/grep/sort/join等都是日志分析的利器。
例子:
1、shell得到Nginx日志中访问量最高的前十个IP

cat access.log.10 | awk '(a[$1]++) END (for(b in a) print b"\t"a[b])' | sort -k2 -r | head -n 10

2、python 统计每个IP的地址点击数

 import re
 import sys
 contents=sys.argv[1]
 def NginxIpHit(logfile_path):
     ipadd = r'\.'.join([r'\d{1,3}']*4)
     re_ip = re.compile(ipadd)
     iphitlisting = {}
     for line in open(contents):
     match = re_ip.match(line)
     if match:
        ip = match.group()
        iphitlisting[ip]=iphitlisting.get(ip,0)+1
     print iphitlisting
 NginxIpHit(contents)

**大规模的日志处理,日志分析指标:
PV、UV、PUPV、漏斗模型和准化率、留存率、用户属性
最终用UI展示各个指标的信息。**

架构

  • 1、实时日志处理流线

数据采集:采用Flume NG进行数据采集
数据汇总和转发:用Flume 将数据转发和汇总到实时消息系统Kafka
数据处理:采用spark streming 进行实时的数据处理
结果显示:flask作为可视化工具进行结果显示

  • 2、离线日志处理流线

数据采集:通过Flume将数据转存到HDFS
数据处理:使用spark sql进行数据的预处理
结果呈现:结果汇总到mysql上,最后使用flask进行结果的展现
Lamda架构:低响应延迟的组合数据传输环境。
查询过程:一次流处理、一次批处理。对应着实时和离线处理。

项目流程

安装flume
Flume进行日志采集,web端的日志一般Nginx、IIS、Tomcat等。Tomcat的日志在var/data/log
安装jdk
安装Flume

wget http://mirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz
tar –zxvf  apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0 –bin  apache-flume-1.5.0
ln   -s  apache-flume-1.5.0   fiume 

环境变量配置

Vim  /etc/profile
Export JAVA_HOME=/usr/local/jdk
Export CLASS_PATH = .:$ JAVA_HOME/lib/dt.jar: $ JAVA_HOME/lib/tools.jar
Export PATH=$ PATH:$ JAVA_HOME/bin
Export FlUME_HOME=/usr/local/flume
Export FlUME_CONF_DIR=$ FlUME_HOME/conf
Export PATH=$ PATH:$ FlUME_HOME /bin
Souce  /etc/profile 

创建agent配置文件将数据输出到hdfs上,修改flume.conf:

a1.sources = r1
a1.sinks = k1
a1.channels =c1
描述和配置sources
第一步:配置数据源
a1.sources.r1.type =exec
a1.sources.r1.channels =c1
配置需要监控的日志输出目录
a1.sources.r1.command=tail  –f  /va/log/data
第二步:配置数据输出
a1.sink.k1.type =hdfs
a1.sink.k1.channels =c1
a1.sink.k1.hdfs.useLocalTimeStamp=true
a1.sink.k1.hdfs.path =hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sink.k1.hdfs.filePrefix =cmcc
a1.sink.k1.hdfs.minBlockReplicas=1
a1.sink.k1.hdfs.fileType =DataStream
a1.sink.k1.hdfs.writeFormat=Text
a1.sink.k1.hdfs.rollInterval =60
a1.sink.k1.hdfs.rollSize =0
a1.sink.k1.hdfs.rollCount=0
a1.sink.k1.hdfs.idleTimeout =0
配置数据通道
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
第四步:将三者级联
a1.souces.r1.channels =c1
a1.sinks.k1.channel =c1

启动Flume Agent

cd  /usr/local/flume
nohup bin/flume-ng  agent  –n  conf  -f  conf/flume-conf.properties
&

已经将flume整合到了hdfs中

  • 整合Flume、kafka、hhdfs
#hdfs输出端
a1.sink.k1.type =hdfs
a1.sink.k1.channels =c1
a1.sink.k1.hdfs.useLocalTimeStamp=true
a1.sink.k1.hdfs.path =hdfs://192.168.11.174:9000/flume/events/%Y/%m/%d/%H/%M
a1.sink.k1.hdfs.filePrefix =cmcc-%H
a1.sink.k1.hdfs.minBlockReplicas=1
a1.sink.k1.hdfs.fileType =DataStream
a1.sink.k1.hdfs.rollInterval =3600
a1.sink.k1.hdfs.rollSize =0
a1.sink.k1.hdfs.rollCount=0
a1.sink.k1.hdfs.idleTimeout =0
#kafka输出端 为了提高性能使用内存通道
a1.sink.k2.type =com.cmcc.chiwei.Kafka.CmccKafkaSink
a1.sink.k2.channels =c2
a1.sink.k2.metadata.broker.List=192.168.11.174:9002;192.168.11.175:9092; 192.168.11.174:9092
a1.sink.k2.partion.key =0
a1.sink.k2.partioner.class= com.cmcc.chiwei.Kafka.Cmcc Partion
a1.sink.k2.serializer.class= kafka. Serializer.StringEncoder
a1.sink.k2.request.acks=0
a1.sink.k2.cmcc.encoding=UTF-8
a1.sink.k2.cmcc.topic.name=cmcc
a1.sink.k2.producer.type =async
a1.sink.k2.batchSize =100

a1.sources.r1.selector.type=replicating

a1.sources = r1
a1.sinks = k1 k2
a1.channels =c1 c2

#c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/home/flume/flumeCheckpoint
a1.channels.c1.dataDir=/home/flume/flumeData, /home/flume/flumeDataExt
a1.channels.c1.capacity=2000000
a1.channels.c1.transactionCapacity=100
#c2
a1.channels.c2.type=memory
a1.channels.c2.capacity=2000000
a1.channels.c2.transactionCapacity=100

用Kafka将日志汇总

1.4 Tar –zxvf  kafka_2.10-0.8.1.1.tgz
1.5 配置kafka和zookeeper文件
配置zookeeper.properties
dataDir=/tmp/zookeeper
client.Port=2181
maxClientCnxns = 0
initLimit = 5
syncLimit = 2
##
server.43 = 10.190.182.43:2888:3888
server.38 = 10.190.182.38:2888:3888

server.33 = 10.190.182.33:2888:3888

配置zookeeper myid

在每个服务器dataDir 创建 myid文件 写入本机id
//server.43   myid  本机编号43
echo “43” >  /tmp/ zookeeper/myid
配置kafka文件, config/server.properties
每个节点根据不同主机名配置
broker.id :43
host.name:10.190.172.43
zookeeper.connect=10.190.172.43:2181, 10.190.172.33:2181,10.190.172.38:2181

启动zookeeper
kafka通过zookeeper存储元数据,先启动它,提供kafka相应的连接地址
Kafka自带的zookeeper

在每个节点 bin/zookeeper-server-start.sh config/zookeeper. properties
启动Kafka

Bin/Kafka-server-start.sh

创建和查看topic
Topic和flume中的要一致,spark streming 也用的这个

Bin/ Kafka-topics.sh  --create  --zookeeper 10.190.172.43:2181
 --replication-factor  1  -- partions   1  --topic  KafkaTopic

查看下:

Bin/ Kafka-topics.sh   --describe   -- zookeeper  10.190.172.43:2181

整合kafka sparkstreming

Buid.sbt
Spark-core
Spark-streming
Spark-streamng-kafka
kafka
  • Spark streming 实时分析
    数据收集和中转已经好了,kafka给sparkstreming
  • Spark sql 离线分析
  • Flask可视化

代码

移步: github.com/jinhang

时间: 2024-12-03 08:57:27

Spark-再接着上次的Lamda架构的相关文章

一文读懂Hadoop、HBase、Hive、Spark分布式系统架构

机器学习.数据挖掘等各种大数据处理都离不开各种开源分布式系统,hadoop用户分布式存储和map-reduce计算,spark用于分布式机器学习,hive是分布式数据库,hbase是分布式kv系统,看似互不相关的他们却都是基于相同的hdfs存储和yarn资源管理,本文通过全套部署方法来让大家深入系统内部以充分理解分布式系统架构和他们之间的关系 本文结构 首先,我们来分别部署一套hadoop.hbase.hive.spark,在讲解部署方法过程中会特殊说明一些重要配置,以及一些架构图以帮我们理解,

《循序渐进学Spark 》Spark架构与集群环境

Spark架构与集群环境 本章首先介绍Spark大数据处理框架的基本概念,然后介绍Spark生态系统的主要组成部分,包括Spark SQL.Spark Streaming.MLlib和GraphX,接着简要描述了Spark的架构,便于读者认识和把握,最后描述了Spark集群环境搭建及Spark开发环境的构建方法. 1.1 Spark概述与架构 随着互联网规模的爆发式增长,不断增加的数据量要求应用程序能够延伸到更大的集群中去计算.与单台机器计算不同,集群计算引发了几个关键问题,如集群计算资源的共享

《Spark大数据处理:技术、应用与性能优化》——1.4 Spark分布式架构与单机多核架构的异同

1.4 Spark分布式架构与单机多核架构的异同 我们通常所说的分布式系统主要指的是分布式软件系统,它是在通信网络互连的多处理机的架构上执行任务的软件系统,包括分布式操作系统.分布式程序设计语言.分布式文件系统和分布式数据库系统等.Spark是分布式软件系统中的分布式计算框架,基于Spark可以编写分布式计算程序和软件.为了整体宏观把握和理解分布式系统,可以将一个集群视为一台计算机.分布式计算框架的最终目的是方便用户编程,最后达到像原来编写单机程序一样编写分布式程序.但是分布式编程与编写单机程序

MLBase:Spark生态圈里的分布式机器学习系统

MLBase背景 MLBase是Spark生态圈里的一部分,专门负责机器学习这块(除它之外,还有负责图计算的GraphX.SQL ad-hoc查询的Shark.具备容错性查询能力的BlinkDB等).看了MLBase的论文后,我是迫不及待想要分享一下这个ML系统.虽然对具体ML算法了解不多,但是对比类似的系统,比如Weka,Mahout而言,我感到MLBase的构想有更进一步的创新和独到之处.而且更重要的是,Spark上支持python算法包这件事情,我现在考虑的是:能打通策略组同学写的算法程序

老曹眼中的全栈架构师

看一下工程师和架构师的区别,简单地,工程师关注的是功能和代码性能,而架构师关注的是业务和系统的性能等非功能性约束.全栈不是全能,只要覆盖了所使用的技术栈就是全栈,例如LNMP,Linux+Nginx+Mysql+PHP.全栈架构师关注的是业务所采纳的全部技术栈,以及技术栈所涉及的系统性能.安全,高可用等诸多因素. 全栈(full stack developer)好像起源于facebook中对工程师的一种称谓,全栈架构师估计是老曹的杜撰.全栈的出现大概有4个方面:系统的性能瓶颈定位,团队间的沟通障

架构漫谈(二):认识概念是理解架构的基础

原文:架构漫谈(二):认识概念是理解架构的基础 架 构漫谈是由资深架构师王概凯Kevin执笔的系列专栏,专栏将会以Kevin的架构经验为基础,逐步讨论什么是架构.怎样做好架构.软件架构如何落地.如 何写好程序等问题.专栏的目的是希望能抛出一些观点,并引发大家思考,如果你有感触或者新的感悟,欢迎联系专栏负责人Gary(微信 greenguolei)深聊. 本文是漫谈架构专栏的第二篇,作者通过几个例子,讨论了一下认识概念的误区,如何有效的去认识概念,明白概念背后的含义,以及如何利用对概念的理解,快速

《Spark大数据处理:技术、应用与性能优化》——第1章 Spark 简 介1.1 Spark是什么

第1章 Spark 简 介 本章主要介绍Spark大数据计算框架.架构.计算模型和数据管理策略及Spark在工业界的应用.围绕Spark的BDAS 项目及其子项目进行了简要介绍.目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL.Spark Streaming.GraphX.MLlib等子项目,本章只进行简要 1.1 Spark是什么 介绍,后续章节再详细阐述.Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的

ASP.NET安全问题--ASP.NET安全架构

在开发Web程序中,我们可以选择用自己的方法来实现安全的策略,或者可以购买第三方的安全代码和产品,不管怎么样,都是要很大的花费的,幸好在.NET Framework中已经内置了安全的解决方案. ASP.NET和 .NET Framework 联合IIS为Web应用程序安全提供了一个基础结构.它的一个很明显的优势在于我们不必再编写自己的安全架构,我们可以利用.NET安全架构的内置的特性,而且整个安全的架构是经过测试和时间的考验了的. .NET安全架构包含了很多的类,这些类用来处理身份验证,授权,基

基于Nginx和Consul构建高可用及自动发现的Docker服务架构

本文讲的是基于Nginx和Consul构建高可用及自动发现的Docker服务架构[编者的话]本文对于Docker和Consul Template以及Nginx如何结合使用做了较为详细的介绍. [上海站|3天烧脑式微服务架构训练营]培训内容包括:DevOps.微服务.Spring Cloud.Eureka.Ribbon.Feign.Hystrix.Zuul.Spring Cloud Config.Spring Cloud Sleuth等. 导读 如果你在大量接触或使用微服务的话,你可能会碰到一个问