Amazon Kinesis是个用于大规模数据实时处理的完全托管服务。不管你是建立一个从远端传感器中收集数据的系统,还是打造一个在多个不同服务器上做日志收集的应用程序,亦或是建立最新的物联网(IoT)解决方案,Amazon Kinesis都可以满足你每个小时从成千上万个不同数据源收集和处理TB级数据的需求。
对于许多这样的系统来说,数据产生的位置对用户来说非常重要。举个例子,从一个远端传感器中发射出的警报不会起到太大的作用,除非用户可以弄清楚事件发生的地点。对于用户来说,地理数据可视化上,在地图上绘制图形是最有效的方法。通过本文,我们将展示如何使用Amazon Kinesis建立一个支撑地理标记流数据的系统,并附上了两个简单的数据可视化方法,它们可以让用户快速读懂这些信息。第一个可视化绘制在一个地球仪上,对小规模事件显示非常有效:
第二个可视化可以应对更多规模数量的事件,它将绘制一段时间上事件的热点图。
下图是系统的架构概览。其中,数据生成者将数据推送给Amazon Kinesis。随后,Amazon Kinesis将处理这些信息,而相关的地理信息则存储在一个Amazon ElastiCache Redis Cluster上,运行在Elastic Beanstalk上的node.js 网络服务器将负责这些数据的可视化。
整个系统使用Java和JavaScript编码,但是不用担心开发环境不支持这些语言的情况;我们所有的代码都将使用Amazon Elastic Compute Cloud编译。
重要提示
当在AWS上建立本文所描述的系统,它将在Amazon Kinesis和其他AWS服务上产生费用。在可能的情况下,我们将使用AWS Free Tier标准资源。换句话说,即使产生了标准Amazon资费,这个费用也已经被最小化了。
地理标记数据源
鉴于大多数读者都无权访问一个连接了设备或传感器的网络,我们必须为本文用例中需要使用的地理标记数据寻找一个合适的替代源。每天,都会有大约5亿的Tweets被发布,Twitter通过他们的流API为开发者提供了一个小型样本。如果Tweet从一个移动设备中发出,那么它就有可能做地理位置标注,让你清楚这条Tweet发布者的位置。你可以注册一个Twitter开发者账户,随后建立一个应用程序。这里,请确保你的应用程序设置为只读访问,随后点击 Keys and Access Tokens面板底部的Create My Access Token按钮。到这个步骤后,你将拥有4个Twitter应用程序秘钥:Consumer Key(API Key)、Consumer Secret(API Secret)、Access Token和Access Token Secret。一旦你拥有这些秘钥,你已经做好了建立AWS解决方案的所有准备。
建立一个安全组(Security Group)
在建立我们系统之前,我们需要为多个服务器建立一个安全组。在AWS Console上,我们需要通过两个规则建立一个安全组。第一个就是允许SSH(port 22)传输,这个步骤允许我们连接到服务器。第二个规则是,当来源是所建立Security Group的Id时,允许所有端口上的任何协议传输。这允许从实例传入的数据会被指定到同一个安全组,同时也保证了系统中所有服务器都可以进行通信。
建立一个Amazon Kinesis Stream
下一个任务则是建立一个Amazon Kinesis Stream。在AWS Console上,进入Region对应的Amazon Kinesis页面,并点击Create Stream按钮。给你的数据流起一个名字,并选择shard的数量。一个Amazon Kinesis Stream由一个或一个以上的shard组成,每个shard都提供了1MB/sec数据输入和2MB/sec的数据输出。在AWS上,一个数据流的总容量可以通过shard的数量非常简单的计算;换个角度来看,系统也可以便捷的增加或减少shard的数量来调整数据流的容量。单独看每个shard,它们每秒可以处理1000个写入事务。而在本文的测试过程中,Twitter流API总是运行在每秒1000 Tweets以下,因此我们只需要1个shard。
建立一个Amazon ElastiCache 实例
下一步,使用AWS Console进入Amazon ElastiCache页面,并点击Launch Cache Cluster按钮。同时,我们需要在下个页面上建立一个单节点Redis集群。选择标准的缓存端口,同时设置Node Type,选择cache.m3.medium类型的节点。。在这里,请确保你使用了之前所建立的安全组。一旦Amazon ElastiCache服务器建立,你可以在左侧导航的Cache Clusters页面中发现终端的名称,并点击Nodes为集群连接。系统其他部分到集群的读写都会使用这个终端名和端口名。
数据生产者(Producer)
如果你有从远端传感器收集数据的条件,那么它们可以直接把数据传送给Amazon Kinesis。在你的架构中,我们需要一个“Producer(生产者)”从Twitter中抽取数据并将之传输给Amazon Kinesis。Twitter拥有一个名为Hosebird(HBC)的开源Java库,它可以做他们流API的数据抽取。用于配置客户端的代码如下:
[js] view plaincopy/**
* Set up your blocking queues: Be sure to size these properly based
* on expected TPS of your stream
*/
BlockingQueue msgQueue = new LinkedBlockingQueue(10000);
/**
* Declare the host you want to connect to, the endpoint, and
* authentication (basic auth or oauth)
*/
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
// Track anything that is geotagged
endpoint.addQueryParameter("locations", "-180,-90,180,90");
// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey,
consumerSecret, token, secret);
// create a new basic client - by default gzip is enabled
Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
.endpoint(endpoint).authentication(hosebirdAuth)
.processor(new StringDelimitedProcessor(msgQueue)).build();
client.connect();
如代码所示,我们在终端上使用了“addQueryParameter”来过滤数据,因此只会发送标记地理位置的Tweets。HBC GitHub页面上的例子展示了如何进行检索项过滤,举个例子,你可以通过标签限制分析指定的Tweets。这里有一件事情需要注意:如果你使用多于一个的过滤器,那么它们将进行“OR”计算。如果你在上述代码中增加了“trackTerms”,那么返回的结果将包含你的检索项或者是地理标志Tweets,这显然不是我们所需要的。在这里,通常的做法是让HBC做最严格的检索(通常是搜索项)来限制从Twitter返回的数据大小,随后在代码中建立第二个过滤器。
根据上文的设置,即一个shard组成的Stream,它每秒最多可以处理1000个事务。因此在通过一个单节点发送数据时就会产生一个问题:在如此速率的传输下,每个Tweet的处理时间只有1毫秒。即使只考虑网络延时这一个因素,我们就很难让到Amazon Kinesis的调用如此快速。因此,数据生产者可以使用一个线程池来并发进行调用。下面的代码显示这个步骤如何进行:
[js] view plaincopy// create producer
ProducerClient producer = new ProducerBuilder()
.withName("Twitter")
.withStreamName(streamName)
.withRegion(regionName)
.withThreads(10)
.build();
producer.connect();
数据生产者的名称通常用于后续的de-bug,而Stream和Region的名称则设置为从配置文件中抽取的变量。在上面的代码中,我们设置了一个容量为10的线程池,它用于处理你想传输给Amazon Kinesis的消息。下面的代码显示了HBC中的Tweets如何连接到数据生产者:
[js] view plaincopy// get message from HBC queue
String msg = msgQueue.take();
// use 'random' partition key
String key = String.valueOf(System.currentTimeMillis());
// send to Kinesis
producer.post(key, msg);
每个Tweet都会被设置成JSON字符串。它们取自HBC队列(msgQueue),并传输给数据生产者。在内部,Producer会将这些字符串放到一个队列中,而一个工作者线程将把数据发送给Amazon Kinesis。因为我们无需在shard上对消息排序,这里将使用一个随机的分区键。
在Amazon EC2上运行Producer之前,你需要设置一个IAM role,这个步骤将允许EC2实例对Amazon Kinesis和Amazon S3进行访问。这里我们点开AWS控制台,点击IAM,并建立一个role。这个role是一个用于Amazon EC2 role的AWS Service Role,定制策略如下:
[js] view plaincopy{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1392290776000",
"Effect": "Allow",
"Action": [
"kinesis:*", "s3:Get*", "s3:List*"
],
"Resource": [
"*"
]
}
]
}
建立一个使用Amazon Linux AMI的m3.medium Amazon EC2 。指定上文所建立的IAM role和Security Group。修改以下脚本,加入Amazon Kinesis Stream名称、所使用的Region(比如,us-west-1),以及上文建立Twitter应用程序时所获得的4个秘钥。将这些粘贴到User Data用于修改实例。最后,审查和启动你的实例,记得提供一个秘钥对用于后续的访问。
[js] view plaincopy#!/bin/bash
# update the instance
yum update -y
# install jdk
yum install java-1.8.0-openjdk -y
yum install java-1.8.0-openjdk-devel -y
yum install git -y
update-alternatives --set java /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.25-0.b18.4.amzn1.x86_64/jre/bin/java
cd /home/ec2-user
# install Apache Maven
wget http://www.dsgnwrld.com/am/maven/maven-3/3.2.3/binaries/apache-maven-3.2.3-bin.tar.gz
tar -xzvf apache-maven-3.2.3-bin.tar.gz
# get the code
git clone https://github.com/awslabs/aws-big-data-blog.git
cp ./aws-big-data-blog/aws-blog-kinesis-data-visualization/TwitterProducer/* /home/ec2-user -r
# create the config file
echo "aws.streamName = Name of Your Amazon Kinesis Stream" > AwsUserData.properties
echo "aws.regionName = Name of your region" >> AwsUserData.properties
echo "twitter.consumerKey = Twitter Consumer Key" >> AwsUserData.properties
echo "twitter.consumerSecret = Twitter Consumer Secret Key" >> AwsUserData.properties
echo "twitter.token = Twitter Access Token" >> AwsUserData.properties
echo "twitter.secret = Twitter Access Token Secret" >> AwsUserData.properties
echo "twitter.hashtags = " >> AwsUserData.properties
# do the build
/home/ec2-user/apache-maven-3.2.3/bin/mvn package
这个脚本将运行在实例启动并安装了Open JDK和Apache Maven之后。随后,它会从Git上下载Producer所需的源代码,并建立配置文件。最终,Apache Maven将用于建立这些源代码。为了启动Producer,跟随说明SSH到实例和类型:
[js] view plaincopyjava -jar target/TwitterProducer-0.0.1-SNAPSHOT.jar AwsUserData.properties
在服务启动及Apache Maven和jar文件都就绪后,你可能还需要等待数分钟。在这里,你应该可以看到Tweets被发送到Amazon Kinesis的信息。Amazon Kinesis Application将检索从Amazon Kinesis获得的JSON Tweet,提取地理位置信息并将之推送到Amazon ElastiCache Redis Cluster。
应用程序来自Amazon Kinesis Application示例,它被托管在AWS GitHub页面上(它非常适合作为所有Amazon Kinesis Application的起点)。Amazon Kinesis Application使用Jedis来与Amazon ElastiCache Redis进行交互。当Record Processor构造和对象初始化时,Redis服务器的详情会被提取。为了处理来自Twitter的记录,下面的代码将被添加到Record Processor:
[js] view plaincopyCoordinate c = null;
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
// use the ObjectMapper to read the json string and create a tree
JsonNode node = mapper.readTree(data);
JsonNode geo = node.findValue("geo");
JsonNode coords = geo.findValue("coordinates");
Iterator elements = coords.elements();
double lat = elements.next().asDouble();
double lng = elements.next().asDouble();
c = new Coordinate(lat, lng);
} catch(Exception e) {
// if we get here, its bad data, ignore and move on to next record
}
if(c != null) {
String jsonCoords = mapper.writeValueAsString(c);
jedis.publish("loc", jsonCoords);
}
如代码所示,Tweet的地理位置提取于我们从Amazon Kinesis接收到的JSON消息。这些信息被包装成一个JSON对象,并被推送到一个Redis键。如果你期望在生产环境中运行一个Amazon Kinesis Application,你可以考虑通过之前博客(Hosting Amazon Kinesis Applications on AWS Elastic Beanstalk)将代码托管在Elastic Beanstalk 上。除此之外,你也可以将应用程序设置为JAR,并在服务器上直接运行。无论使用哪种方式,你必须为你的目标服务器建立一个IAM role。访问IAM控制台,并建立一个具备以下策略的AWS Service Role:
[js] view plaincopy{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1392290776000",
"Effect": "Allow",
"Action": [
"kinesis:*", "cloudwatch:*", "dynamodb:*", "elasticache:*", "s3:Get*", "s3:List*"
],
"Resource": [
"*"
]
}
]
}
如果你在Elastic Beanstalk之外运行你的Amazon Kinesis Application,建立一个Amazon EC2实例(m3.meduim and Amazon Linux AMI),并分配你之前建立的IAM role 。再次提醒,务必使用之前建立的安全组;它允许你的实例可以与Redis通信。通过键入Amazon Kinesis Stream名称、Region名称,以及所建立的Amazon ElastiCache Redis集群的名称和端口。随后,你可以将下面的脚本复制到服务配置界面的“User Data”部分。
[js] view plaincopy#!/bin/bash
# update instance
yum update -y
# install jdk
yum install java-1.8.0-openjdk -y
yum install java-1.8.0-openjdk-devel -y
yum install git -y
update-alternatives --set java /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.25-0.b18.4.amzn1.x86_64/jre/bin/java
cd /home/ec2-user
# install Apache Maven
wget http://www.dsgnwrld.com/am/maven/maven-3/3.2.3/binaries/apache-maven-3.2.3-bin.tar.gz
tar -xzvf apache-maven-3.2.3-bin.tar.gz
# get the code
git clone https://github.com/awslabs/aws-big-data-blog.git
cp ./aws-big-data-blog/aws-blog-kinesis-data-visualization/KinesisApplication/* /home/ec2-user -r
# create the config file
echo "appName = DataVizAnalyzer" > KinesisClient.properties
echo "kinesisEndpoint = Name of your region" >> KinesisClient.properties
echo "redisEndpoint = Name of the Redis node" >> KinesisClient.properties
echo "redisPort = The port number used by Redis" >> KinesisClient.properties
echo "kinesisStreamName = The name of your Amazon Kinesis Stream" >> KinesisClient.properties
# build
/home/ec2-user/apache-maven-3.2.3/bin/mvn package
下面脚本与前文的工作原理相同。Open JDK和Apache Maven会被安装,用于Amazon Kinesis Application的代码会从Git上下载并通过Apache Maven安装。SSH到实例。一旦jar被建立,键入以下的命令:
[js] view plaincopyjava -jar target/KinesisClient-0.0.1-SNAPSHOT.jar KinesisClient.properties
随后,你将看到一些数据从Amazon Kinesis抽取,以及坐标数据被发送到Redis的信息。
Node.js服务器
你可以在这里下载用于node.js服务器的代码。Node.js服务器使用了两个非常重要的库:node_redis来与Redis通信,socket.io在浏览器和服务器之间创建一个socket连接。Socket被用于推送新Tweet信息到任何连接用户。
当node.js服务器启动,它将被注册以监视Redis发布的变化:
[js] view plaincopy// subscribe to listen to events from redis redisClient.on("ready", function () { redisClient.subscribe("loc");
如果Redis发布产生变化,系统可以非常便捷的将这个变化推送到所有连接用户:
[js] view plaincopy// When we get a message from redis, we send the message down the socket to the client
redisClient.on('message', function(channel, message) {
var coord = JSON.parse(message);
io.emit('tweet', coord);
});
你下载的node.js代码同样包含了用于可视化地理位置数据的HTML和JavaScript代码。
这个代码包含了两个用于可视化的方法,它们都使用了Three.js来做3D渲染。第一个可视化方法的出发点是一个旋转的地球仪,它基于Bjørn Sandvik一篇非常著名的博客。这里同样需要感谢Tom Paterson,它开源了我们需要使用的textures。在这里,地球仪的主要代码是public/js/earth.js。环境建立通过调用createEarth、createClouds、createStars、createSun和createLensFlare完成。
设置最后一部分是建立50个lights,它们开始时都被关闭。当sockets检测到一条tweet被发送到浏览器,lights池将被搜索以获得一个空闲的light。随后这个light会被移动到tweet的发送位置,而在几秒内,你将看到亮度会逐渐增加然后减弱。从而实现了每条Tweet发送时的可视化。需要注意的是,在发光开始时,它所持续的时间和颜色就被确定。下面的代码将触发这个过程:
[js] view plaincopyvar socket = io();
ocket.on('tweet', function(coord) {
startLight(coord.lat, coord.lng, 2000, 0x6DAEE1);
);
旋转的地球仪组合灯光让信息来的非常直观,但是这个可视化方法的弱点是可显示信息的数量。建立这个图像和灯光是非常资源密集型的一个操作,如果你测试,你就会发现当显示事件每秒超过25个时,机器的速度就会变慢。
第二个可视化方法旨在展示更多的数据点,其主要部分代码在public/js/earth2.js中。代码的结构非常类似于上个方法,环境通过createEarth和createBackgroundLight两个方法建立。这里所使用的观点非常简单,整个地图被水平表示,从而节省了大量的计算资源。取代建立lights池,这个可视化方法将建立一个拥有5000个点的 PointCloud。每个点都建立在一个无穷大的位置,因此用户不会看到它。
当sockets播放一个Tweet,代码会搜索一个没有被使用的点,随后将之移动到Twitter发送的位置。每个点都使用了Additive Blending,这就意味着同一个位置的点越多,那个地方的颜色越亮。将每个点保存一个足够的时间(比如10秒),你可以获得Tweets产生位置的热点图。
为了让node.js运行在Elastic Beanstalk上,你首先需要为代码建立一个zip存档。Zip目录的内容(比如 globe.html、heatmap.html和server.js),而不是父文件夹。下一步,使用以下策略为应用程序建立一个 IAM role:
[js] view plaincopy{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1392290776000",
"Effect": "Allow",
"Action": [
"elasticache:*"
],
"Resource": [
"*"
]
}
]
}
到了这一步,你可以到AWS Console的Elastic Beanstalk,并建立一个新的应用程序。
1. 将Node.js设置为预定义环境,指定代码的zip文件作为应用程序源。
2. 在 Configuration Details页面,指定你刚建立的IAM role。
3. 在Elastic Beanstalk应用程序建立之后,选择环境左侧导航中的Configuration,随后选择Software Configuration面板(点击右上角的cog)。
4. 在Environment Properties下,在PARAM1参数中键入Redis ElastiCache名称。
5. 最后,选择实例面板,加入Security Group名称。
如果你回到主应用程序仪表盘,你将看到你可视化应用程序的地址,你可以在AWS Elastic Beanstalk Application建立完成后访问。如果遭遇了WebGL不支持警告,请确保你使用了兼容浏览器。
总结
本文向你展示了Amazon Kinesis如何实时捕获并保存地理位置数据,并在一个基于网络的客户端上可视化。当Amazon Kinesis Application处理Tweet时,它只是提取了JSON文件中的地理位置信息。因此,我们可以很简单的将之修改成提取其他数据,并用来提升可视化。
本文介绍的解决方案同样是可以扩展的:给Amazon Kinesis Stream 中添加Shard非常简单,Amazon Kinesis Application 同样可以跑在一群主机上以增加吞吐量。