利用 MapReduce分析明星微博数据实战

互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离。歌星、影星、体育明星、作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单。同时,互联网的飞速发展本身也造就了一批互联网明星,这些人借助新的手段,最大程度发挥了粉丝经济的能量和作用,在互联网时代赚得盆满钵满。

正是基于这样一个大背景,今天我们做一个分析明星微博数据的小项目。

1、项目需求

自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中。

2、数据集

明星 明星微博名称 粉丝数 关注数 微博数

俞灏明 俞灏明 10591367 206 558

李敏镐 李敏镐 22898071 11 268

林心如 林心如 57488649 214 5940

黄晓明 黄晓明 22616497 506 2011

张靓颖 张靓颖 27878708 238 3846

李娜 李娜 23309493 81 631

徐小平 徐小平 11659926 1929 13795

唐嫣 唐嫣 24301532 200 2391

有斐君 有斐君 8779383 577 4251

3、分析

自定义InputFormat读取明星微博数据,通过自定义getSortedHashtableByValue方法分别对明星的fan、followers、microblogs数据进行排序,然后利用MultipleOutputs输出不同项到不同的文件中

4、实现

1)、定义WeiBo实体类,实现WritableComparable接口


  1. package com.buaa; 
  2.  
  3. import java.io.DataInput; 
  4. import java.io.DataOutput; 
  5. import java.io.IOException; 
  6.  
  7. import org.apache.hadoop.io.WritableComparable; 
  8.  
  9. /**  
  10. * @ProjectName MicroblogStar 
  11. * @PackageName com.buaa 
  12. * @ClassName WeiBo 
  13. * @Description TODO 
  14. * @Author 刘吉超 
  15. * @Date 2016-05-07 14:54:29 
  16. */ 
  17. public class WeiBo implements WritableComparable<Object> { 
  18.     // 粉丝 
  19.     private int fan; 
  20.     // 关注 
  21.     private int followers; 
  22.     // 微博数 
  23.     private int microblogs; 
  24.      
  25.     public WeiBo(){}; 
  26.      
  27.     public WeiBo(int fan,int followers,int microblogs){ 
  28.         this.fan = fan; 
  29.         this.followers = followers; 
  30.         this.microblogs = microblogs; 
  31.     } 
  32.      
  33.     public void set(int fan,int followers,int microblogs){ 
  34.         this.fan = fan; 
  35.         this.followers = followers; 
  36.         this.microblogs = microblogs; 
  37.     } 
  38.      
  39.     // 实现WritableComparable的readFields()方法,以便该数据能被序列化后完成网络传输或文件输入 
  40.     @Override 
  41.     public void readFields(DataInput in) throws IOException { 
  42.         fan  = in.readInt(); 
  43.         followers = in.readInt(); 
  44.         microblogs = in.readInt(); 
  45.     } 
  46.      
  47.     // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出  
  48.     @Override 
  49.     public void write(DataOutput out) throws IOException { 
  50.         out.writeInt(fan); 
  51.         out.writeInt(followers); 
  52.         out.writeInt(microblogs); 
  53.     } 
  54.      
  55.     @Override 
  56.     public int compareTo(Object o) { 
  57.         // TODO Auto-generated method stub 
  58.         return 0; 
  59.     } 
  60.  
  61.     public int getFan() { 
  62.         return fan; 
  63.     } 
  64.  
  65.     public void setFan(int fan) { 
  66.         this.fan = fan; 
  67.     } 
  68.  
  69.     public int getFollowers() { 
  70.         return followers; 
  71.     } 
  72.  
  73.     public void setFollowers(int followers) { 
  74.         this.followers = followers; 
  75.     } 
  76.  
  77.     public int getMicroblogs() { 
  78.         return microblogs; 
  79.     } 
  80.  
  81.     public void setMicroblogs(int microblogs) { 
  82.         this.microblogs = microblogs; 
  83.     } 

2)、自定义WeiboInputFormat,继承FileInputFormat抽象类


  1. package com.buaa; 
  2.  
  3. import java.io.IOException; 
  4.  
  5. import org.apache.hadoop.conf.Configuration; 
  6. import org.apache.hadoop.fs.FSDataInputStream; 
  7. import org.apache.hadoop.fs.FileSystem; 
  8. import org.apache.hadoop.fs.Path; 
  9. import org.apache.hadoop.io.Text; 
  10. import org.apache.hadoop.mapreduce.InputSplit; 
  11. import org.apache.hadoop.mapreduce.RecordReader; 
  12. import org.apache.hadoop.mapreduce.TaskAttemptContext; 
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
  14. import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
  15. import org.apache.hadoop.util.LineReader; 
  16.  
  17. /**  
  18. * @ProjectName MicroblogStar 
  19. * @PackageName com.buaa 
  20. * @ClassName WeiboInputFormat 
  21. * @Description TODO 
  22. * @Author 刘吉超 
  23. * @Date 2016-05-07 10:23:28 
  24. */ 
  25. public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{ 
  26.  
  27.      @Override 
  28.      public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { 
  29.           // 自定义WeiboRecordReader类,按行读取 
  30.           return new WeiboRecordReader(); 
  31.      } 
  32.  
  33.      public class WeiboRecordReader extends RecordReader<Text, WeiBo>{ 
  34.             public LineReader in;  
  35.             // 声明key类型 
  36.             public Text lineKey = new Text(); 
  37.             // 声明 value类型 
  38.             public WeiBo lineValue = new WeiBo(); 
  39.              
  40.             @Override 
  41.             public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { 
  42.                 // 获取split 
  43.                 FileSplit split = (FileSplit)input; 
  44.                 // 获取配置  
  45.                 Configuration job = context.getConfiguration(); 
  46.                 // 分片路径  
  47.                 Path file = split.getPath(); 
  48.                  
  49.                 FileSystem fs = file.getFileSystem(job);  
  50.                 // 打开文件    
  51.                 FSDataInputStream filein = fs.open(file); 
  52.                  
  53.                 in = new LineReader(filein,job);  
  54.             } 
  55.  
  56.             @Override 
  57.             public boolean nextKeyValue() throws IOException, InterruptedException { 
  58.                 // 一行数据 
  59.                 Text line = new Text(); 
  60.                  
  61.                 int linesize = in.readLine(line); 
  62.                  
  63.                 if(linesize == 0)  
  64.                     return false;  
  65.                  
  66.                 // 通过分隔符'\t',将每行的数据解析成数组 
  67.                 String[] pieces = line.toString().split("\t"); 
  68.                  
  69.                 if(pieces.length != 5){   
  70.                     throw new IOException("Invalid record received");   
  71.                 }  
  72.                  
  73.                 int a,b,c; 
  74.                 try{   
  75.                     // 粉丝   
  76.                     a = Integer.parseInt(pieces[2].trim()); 
  77.                     // 关注 
  78.                     b = Integer.parseInt(pieces[3].trim()); 
  79.                     // 微博数 
  80.                     c = Integer.parseInt(pieces[4].trim()); 
  81.                 }catch(NumberFormatException nfe){   
  82.                     throw new IOException("Error parsing floating poing value in record");   
  83.                 } 
  84.                  
  85.                 //自定义key和value值 
  86.                 lineKey.set(pieces[0]);   
  87.                 lineValue.set(a, b, c); 
  88.                  
  89.                 return true; 
  90.             } 
  91.              
  92.             @Override 
  93.             public void close() throws IOException { 
  94.                 if(in != null){ 
  95.                     in.close(); 
  96.                 } 
  97.             } 
  98.  
  99.             @Override 
  100.             public Text getCurrentKey() throws IOException, InterruptedException { 
  101.                 return lineKey; 
  102.             } 
  103.  
  104.             @Override 
  105.             public WeiBo getCurrentValue() throws IOException, InterruptedException { 
  106.                 return lineValue; 
  107.             } 
  108.  
  109.             @Override 
  110.             public float getProgress() throws IOException, InterruptedException { 
  111.                 return 0; 
  112.             } 
  113.              
  114.         } 

3)、编写mr程序


  1. package com.buaa; 
  2.  
  3. import java.io.IOException; 
  4. import java.util.Arrays; 
  5. import java.util.Comparator; 
  6. import java.util.HashMap; 
  7. import java.util.Map; 
  8. import java.util.Map.Entry; 
  9.  
  10. import org.apache.hadoop.conf.Configuration; 
  11. import org.apache.hadoop.conf.Configured; 
  12. import org.apache.hadoop.fs.FileSystem; 
  13. import org.apache.hadoop.fs.Path; 
  14. import org.apache.hadoop.io.IntWritable; 
  15. import org.apache.hadoop.io.Text; 
  16. import org.apache.hadoop.mapreduce.Job; 
  17. import org.apache.hadoop.mapreduce.Mapper; 
  18. import org.apache.hadoop.mapreduce.Reducer; 
  19. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
  20. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
  21. import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
  22. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
  23. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
  24. import org.apache.hadoop.util.Tool; 
  25. import org.apache.hadoop.util.ToolRunner; 
  26.  
  27. /**  
  28. * @ProjectName MicroblogStar 
  29. * @PackageName com.buaa 
  30. * @ClassName WeiboCount 
  31. * @Description TODO 
  32. * @Author 刘吉超 
  33. * @Date 2016-05-07 09:07:36 
  34. */ 
  35. public class WeiboCount extends Configured implements Tool { 
  36.     // tab分隔符 
  37.     private static String TAB_SEPARATOR = "\t"; 
  38.     // 粉丝 
  39.     private static String FAN = "fan"; 
  40.     // 关注 
  41.     private static String FOLLOWERS = "followers"; 
  42.     // 微博数 
  43.     private static String MICROBLOGS = "microblogs"; 
  44.      
  45.     public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> { 
  46.         @Override 
  47.         protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException { 
  48.             // 粉丝 
  49.             context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan())); 
  50.             // 关注 
  51.             context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers())); 
  52.             // 微博数 
  53.             context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs())); 
  54.         } 
  55.     } 
  56.      
  57.     public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> { 
  58.         private MultipleOutputs<Text, IntWritable> mos; 
  59.  
  60.         protected void setup(Context context) throws IOException, InterruptedException { 
  61.             mos = new MultipleOutputs<Text, IntWritable>(context); 
  62.         } 
  63.  
  64.         protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException { 
  65.             Map<String,Integer> map = new HashMap< String,Integer>(); 
  66.              
  67.             for(Text value : Values){ 
  68.                 // value = 名称 + (粉丝数 或 关注数 或 微博数) 
  69.                 String[] records = value.toString().split(TAB_SEPARATOR); 
  70.                 map.put(records[0], Integer.parseInt(records[1].toString())); 
  71.             } 
  72.              
  73.             // 对Map内的数据进行排序 
  74.             Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map); 
  75.              
  76.             for(int i = 0; i < entries.length;i++){ 
  77.                 mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue()); 
  78.             }                
  79.         } 
  80.  
  81.         protected void cleanup(Context context) throws IOException, InterruptedException { 
  82.             mos.close(); 
  83.         } 
  84.     } 
  85.      
  86.     @SuppressWarnings("deprecation") 
  87.     @Override 
  88.     public int run(String[] args) throws Exception { 
  89.         // 配置文件对象 
  90.         Configuration conf = new Configuration(); 
  91.          
  92.         // 判断路径是否存在,如果存在,则删除 
  93.         Path mypath = new Path(args[1]); 
  94.         FileSystem hdfs = mypath.getFileSystem(conf); 
  95.         if (hdfs.isDirectory(mypath)) { 
  96.             hdfs.delete(mypath, true); 
  97.         } 
  98.          
  99.         // 构造任务 
  100.         Job job = new Job(conf, "weibo"); 
  101.         // 主类 
  102.         job.setJarByClass(WeiboCount.class); 
  103.  
  104.         // Mapper 
  105.         job.setMapperClass(WeiBoMapper.class); 
  106.         // Mapper key输出类型 
  107.         job.setMapOutputKeyClass(Text.class); 
  108.         // Mapper value输出类型 
  109.         job.setMapOutputValueClass(Text.class); 
  110.          
  111.         // Reducer 
  112.         job.setReducerClass(WeiBoReducer.class); 
  113.         // Reducer key输出类型 
  114.         job.setOutputKeyClass(Text.class); 
  115.         // Reducer value输出类型 
  116.         job.setOutputValueClass(IntWritable.class); 
  117.          
  118.         // 输入路径 
  119.         FileInputFormat.addInputPath(job, new Path(args[0])); 
  120.         // 输出路径 
  121.         FileOutputFormat.setOutputPath(job, new Path(args[1])); 
  122.          
  123.         // 自定义输入格式 
  124.         job.setInputFormatClass(WeiboInputFormat.class) ; 
  125.         //自定义文件输出类别 
  126.         MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class); 
  127.         MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class); 
  128.         MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class); 
  129.          
  130.         // 去掉job设置outputFormatClass,改为通过LazyOutputFormat设置   
  131.         LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  
  132.          
  133.          //提交任务   
  134.         return job.waitForCompletion(true)?0:1; 
  135.     } 
  136.      
  137.     // 对Map内的数据进行排序(只适合小数据量) 
  138.     @SuppressWarnings("unchecked") 
  139.     public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {   
  140.         Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);   
  141.         // 排序 
  142.         Arrays.sort(entries, new Comparator<Entry<String, Integer>>() { 
  143.             public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) { 
  144.                 return entry2.getValue().compareTo(entry1.getValue()); 
  145.             }  
  146.         }); 
  147.         return entries;   
  148.     } 
  149.      
  150.     public static void main(String[] args) throws Exception { 
  151.         String[] args0 = { 
  152.                 "hdfs://ljc:9000/buaa/microblog/weibo.txt", 
  153.                 "hdfs://ljc:9000/buaa/microblog/out/"  
  154.         }; 
  155.         int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0); 
  156.         System.exit(ec); 
  157.     } 

5、运行结果

本文作者:刘超ljc

来源:51CTO

时间: 2024-09-27 16:51:25

利用 MapReduce分析明星微博数据实战的相关文章

如何利用mapreduce批量读写hbase数据

package com.mr.test; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; imp

物理学博士教你怎样分析微博数据,怎样涨粉

网上已经有太多关于怎么增加微博粉丝数,以及怎样让我们发的微博获得更多转发的建议了.我们并不知道这些建议是否有效,因为它们大都是建立在个人感觉上,而缺乏真正有说服力的证据.实际上微博是一个非常适合进行数据分析的东西,所以想谈微博心得,你得用数据说话. 一个普通用户的微博数据 从 2012 年 8 月 24 日开始,我像个自恋者一样,每天都看看自己的粉丝数涨了多少--不但看,而且还顺手把数字记录下来.这样坚持到写作本文的时候一共过去了 86 天.这个数据的可贵之处并不在于它是人工测量的,而在于它是独

新浪微博api 调用-如何调用新浪API对其微博数据进行挖掘分析

问题描述 如何调用新浪API对其微博数据进行挖掘分析 各位大神,菜鸟目前正在进行微博数据挖掘分析的相关项目,刚开始没多久,在此想请教各位,如何在SQL等数据库软件上调用新浪微博API,我已经在新浪微博平台,并下载了相关的SDK包,调用所必需的 app secret,回调地址等都已经有了,现在就是不知道如何在数据库软件上调用数据的具体操作,是通过平台给出的那些接口吗?具体问题如下: 1,如何获取微博内的数据: 2,新浪平台提供的那些接口怎么使用.(PS:关于数据挖掘的算法分析这些以后会逐步解决,现

大数据实战:用户流量分析系统

--------------------------------------------------------------------------------------------------------------- [版权申明:本文系作者原创,转载请注明出处] 文章出处:http://blog.csdn.net/sdksdk0/article/details/51628874 作者:朱培 --------------------------------------------------

《构建实时机器学习系统》一3.3 利用 Pandas 分析实时股票报价数据

3.3 利用 Pandas 分析实时股票报价数据 熟悉一项软件的最好方法就是通过示例来亲自使用它.这里将会通过分析苹果公司 2015 年 8 月 3 日秒级股票价格的数据来熟悉 Pandas 的用法.建议通过Python 笔记本或交互式窗口的方法来进行下面的操作. 首先,需要导入相关的模块,在导入Pandas模块的同时,我们还用到了Datetime模块.Datetime模块的主要功能是对时间.日期等数据进行处理,导入命令如下: import pandas as pd from datetime

微博运营:企业微博营销实战流程

本文来自@晏涛三寿 现为知名电商企业网络策划及微博运营负责人. 引言 研究微博营销已达半年之久,在实战中不断的摸索总结,零零散散的也写了不少关于微博方面的文章,每次都在派代上首发,得到了派友们的支持或者拍砖,最后被其他网站转载.之前都零零散散的,这一次花了点时间整理出了<企业微博运营实战流程>,希望将自己的一点经验与大家分享,没有华丽的语言,没有精美的图片,没有深奥的理论,都是自己在操作过程中的一点心得体会. 上次写完<微博营销基本兵法>之后大家一直追问我具体该如何操作,我知道大家

一种基于MapReduce架构的微博用户影响力评价算法的设计与实现

一种基于MapReduce架构的微博用户影响力评价算法的设计与实现 方超    周斌    李爱平 随着互联网的高速发展和Web2.0时代的到来,微博用户正以惊人的速度在增长.新浪微博现以粉丝数作为用户排名的依据,在僵尸粉和大量低使用率帐号的影响下,这种简单的排名依据难以表征用户的影响力.本文以海量新浪微博数据为分析对象,在分布式系统上构建微博用户的影响力评价模型.文章主要以微博用户的转发网络计算微博用户的微博影响力,再利用关注关系计算微博用户的潜在影响力,最后合成微博用户影响力的评价模型.实验

使用Amazon EMR和Tableau分析和可视化数据

引言 针对不同格式和大小的数据,Hadoop生态圈提供了丰富的工具进行分析并提取价值.最初,Hadoop生态圈专注于分析大批量数据,提供了类似MapReduce.Pig和Hive等组件.而当下,Hadoop已提供了大量用于交互式数据查询的工具,比如Impala.Drill和Presto.本篇文章将教会你如何使用Amazon Elastic MapReduce(Amazon EMR)来分析Amazon Simple Storage Service(Amazon S3)上存储的数据,并使用Table

深刻!阿里、宝洁大数据实战

虎嗅F&M;创新节的"如何洞察用户:阿里与宝洁的大数据实战"专场里,阿里巴巴数据委员会会长车品觉.宝洁中国市场研究部总经理李霈.英特尔中国研究院首席工程师吴甘沙和股票雷达创始人冯月聊到一个很有趣的话题:作为传统公司,宝洁很羡慕阿里能够轻易收集到真实.实时.全面的的数据,但车品觉却说,大数据公司对数据处理同样存在两个难点: 第一大数据太大,大数据公司跟传统公司一样需要做"采样"的工作. 第二是很难还原真实需求."用户研究上来讲,大数据给予的力量就是还