TableStore发布Timeline Lib:轻松构建千万级IM和Feed流系统

场景

移动互联网时代,微信和微博已经成为这个时代的两大支柱类社交应用。
这两类应用,其中一个是IM产品,一个是Feed流产品,微信的朋友圈也属于Feed流。
如果再细心去发现,会发现基本所有移动App都有Feed流的功能:消息广场、个人关注、通知、新闻聚合和图片分享等等。各种各样的Feed流产品占据了我们生活的方方面面。

现状

IM和Feed流功能已经基本成为所有App标配,如何开发一个IM或者Feed流功能是很多架构师、工程师要面临的问题。虽然是一个常见功能,但仍然是一个巨大的挑战,要考虑的因素非常多,比如:

  1. 存储系统里面如何选型才能支持大量数据存储,而且价格便宜。
  2. 同步系统使用何种架构、推送模型和系统才能保证高并发的同时延迟稳定?

为了解决上述问题,我们之前推出了三篇文章来阐述:

  • 现代IM系统中消息推送和存储架构的实现
    介绍了IM系统的传统架构模型和现代架构模型选型,以及抽象出了一个Timeline模型。
  • 高并发IM系统架构优化实践
    介绍了使用Table Store主键列自增功能后,对传统IM架构带来的巨大冲击,在稳定性,性能,成本,架构复杂度等多个方面都产生了巨大收益。
  • 如何打造千万级Feed流系统
    介绍了如何设计一款支持千万级并发的Feed流系统,详细分析了存储系统、同步系统选型、推拉模式(读扩散、写扩散)的选择及改进等。

上述三篇文章推出后,用户反响很好,在各个平台的传播很广,为很多用户提供了设计一款IM和Feed流产品的架构思路,但是从这里到完全实现一个可靠的IM、Feed流系统平台还有很长的路,比如:

  1. 如何才能更加简单的弄懂消息存储和同步模型?
  2. 如何将架构模型映射到存储系统和推送系统中?
  3. 如何才能保证对存储系统和推送系统的使用是最佳方式?

Timeline模型

针对上述三个问题,在《现代IM系统中消息推送和存储架构的实现》中引入了一个逻辑模型概念:Timeline模型。

在《现代IM系统中消息推送和存储架构的实现》中基于IM系统提出了Timeline模型,进一步会发现Timeline模型适用场景可以更广泛:

  • 有一方是消息生产者。
  • 有一方是消息消费者。
  • 生产者产生的一条消息可能会被一个或多个消费者消费。
  • 消费者需要聚合来自多个生产者的消息在一个页面展现。

IM和Feed流产品完全匹配上述四个特征,所以Timeline模型可以完全适用于IM和Feed流场景中。

模型适配性

下面我们来看看如何在各个场景中使用Timeline:

IM

  • 单聊就是三个Timeline:

    • 会话Timeline(存储历史消息)
    • 用户A的收件箱(A的同步库Timeline)
    • 用户B的收件箱(B的同步库Timeline)
  • 群聊就是1 + N个Timeline:
    • 会话Timeline(存储历史消息)
    • 用户A的收件箱(A的同步库Timeline)
    • 。。。。。。
    • 用户N的收件箱(N的同步库Timeline)

每个用户只有一个同步库Timeline,就算用户A在10个群里面,那么这个10个群的同步消息都是发送给用户A的这一个同步库Timeline中。

朋友圈

  • 发一条朋友圈状态就是1 + N个Timeline:

    • 自己历史消息Timeline(自己的存储库Timeline)
    • 朋友A的收件箱(A的同步库Timeline)
    • 。。。。。。
    • 朋友N的收件箱(N的同步库Timeline)

微博

如果是微博,粉丝可能会达到上亿级别,这时候会比朋友圈稍微复杂些:

  • 大V发一条微博就是 1 + M个Timeline(M << N,N是粉丝数)。

    • 自己历史消息Timeline(自己的存储库Timeline)
    • 粉丝A的收件箱(A的同步库Timeline)
    • 。。。。。。
    • 粉丝M的收件箱(M的同步库Timeline)
      剩余N - M的粉丝直接读大V自己的存储库Timeline内容即可,那么怎么设置M和N - M,可以参考《如何打造千万级Feed流系统》这篇文章。

从上面分析可以看出来,不管是IM,还是Feed流产品都可以将底层的存储、同步逻辑抽象成一个对多个Timeline进行读写的模型。

问题

有了Timeline概念模型后,从IM/Feed流应用映射到Timeline就比较容易了,但是从Timeline映射到存储、同步系统仍然很复杂,主要体现在:

  • 如何实现Timeline概念模型?
  • 如何将Timeline模型转换成对存储系统、同步系统的读写接口?
  • 如何设计存储系统、同步系统的表结构?
  • 如何选择存储系统、同步系统的读写方式?
  • 如何评估存储系统、同步系统的最大承载能力?
  • 如何实现才能保证性能最佳?
  • 如何才能吸取大型同类型系统架构设计的经验教训、
  • 如何才能避免一些实现、使用上的隐患?

这些问题涉及的内容光,细节多,深度大,坑较多等,整体上很繁杂,这一部分在耗费了大量人力之后,结果可能并不理想。

Timeline LIB

针对上述问题,只要存储系统和推送系统确定后,剩余的工作都是类似的,可以完全将经验封装起来成为一个LIB,将表结构设计,读写方式,隐患等等都解决好,然后供后来者使用,后来者可以不用再关心Timeline到底层存储系统之间的事情了。

所以,我们基于JAVA语言实现了一个TableStore-Timeline LIB,简称Timeline LIB。

目前已经开源在了GitHub上:Timeline@GitHub

Timeline LIB的结构如下:

整个Timeline分为两层,上层的Timeline层和下层的Store层。

Timeline层,提供最终的读写接口,用户操作的也是Timeline的接口。

Store层,负责存储系统的交互,目前Timeline LIB中提供了DistributeTimelineStore,基于Table Store,同时实现了分布式的存储和同步。后续会继续实现GlobalTimelineStore等。如果有用户有其他系统需求,比如MySQL,Redis,可以通过实现IStore接口来新增MySQLStore和RedisStore。

也欢迎大家将自己实现的Store通过GitHub的PullRequest共享出来。

有了Timeline LIB之后,如果要实现一个IM或者Feed流,只需要创建两种类型Timeline(存储类,同步类),然后调用Timeline的读写接口即可。

接口

接下来,我们看下Timeline LIB的API。
Timeline LIB中面向最终用户的是Timeline类,用于对每个Timeline做读写操作。

Timeline的接口主要分为三类:

  • 写:

    • store:同步写入
    • storeAsync:异步写入
    • batch:批量写入
  • 读:
    • get:同步读
    • getAsync:异步读
  • 范围读:
    • scan:同步范围读取

定义

   /**
     * Timeline的构造函数。
     * @param timelineID    此Timeline对应的ID。唯一标识一个Timeline,需要全局唯一,如果业务场景中需要多个字段才能唯一标识一个TimelineID,此时可以将多个字段拼接成一个字段。
     * @param store         此Timeline关联的Store,一般为存储Store或同步Store。实现了IStore接口类的对象,目前LIB中默认实现了DistributeTimelineStore,可以使用此store。除此之外,用户还可以自己实现自己的Store类,用于适配其他系统。
     */
    public Timeline(String timelineID, IStore store);

    /**
     * 写入一个消息到此Timeline中。
     * @param message   消息对象,需实现IMessage接口。用户需要通过实现IMessage接口创造符合自己业务场景的消息类,LIB中默认实现了StringMessage类。
     * @return          完整的TimelineEntry,包括消息和顺序ID。
     */
    public TimelineEntry store(IMessage message);

    /**
     * 异步写入消息接口。
     * @param message     消息对象,需实现IMessage接口。
     * @param callback    回调函数。
     * @return            Future对象,异步模式下,Future和callback需要二选一。
     */
    public Future<TimelineEntry> storeAsync(IMessage message, TimelineCallback<IMessage> callback);    

   /**
     * 批量写入消息接口。
     * 此接口只是把消息加入到本地的一个buffer中,当buffer满或者超时(默认10s,可配置)才会统一写入。
     * 此接口返回时并不一定消息已经写入成功。
     * @param message 消息对象,需实现IMessage接口。
     */
    public void batch(IMessage message) {

   /**
     * 同步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
     * @param sequenceID    顺序ID。此顺序ID可由store或scan接口获取到。
     * @return              完整的TimelineEntry,包括消息和顺序ID。
     */
    public TimelineEntry get(Long sequenceID);

    /**
     * 异步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
     * @param sequenceID    顺序ID。
     * @param callback      读取结束后的回调函数。
     * @return              Future对象,异步模式下,Future和Callback需要二选一。
     */
    public Future<TimelineEntry> getAsync(Long sequenceID, TimelineCallback<Long> callback);

    /**
     * 顺序读取一段范围内或固定数目的消息,支持逆序,正序。
     * @param parameter     顺序读取的参数,包括方向、from、to和maxCount。
     * @return              TimelineEntry的迭代器,通过迭代器可以遍历到待读取的所有消息。
     */
    public Iterator<TimelineEntry> scan(ScanParameter parameter);

get和scan

  • get接口一般用于读取单条消息,在IM和Feed流中可使用的场景非常少,甚至可以不使用。
  • scan是读取消息或Feed流消息的主要途径,通过scan可以读取到已经产生,但还未消费的消息。

如何用

简介

安装

在 Maven 工程中使用 Timeline LIB 只需在 pom.xml 中加入相应依赖即可:

<dependency>
    <groupId>com.aliyun.openservices.tablestore</groupId>
    <artifactId>timeline</artifactId>
    <version>1.0.0</version>
</dependency>

实现自己的IMessage

使用之前,需要先实现一个满足自己业务特点的Message类,此Message类能表示业务中的一条完整消息。
需要实现IMessage的下列接口:

  • String getMessageID():

    • 生成一个消息ID。
    • 消息ID的作用主要是用于消息去重,最大使用场景是IM中的多人群。对于Timeline模型,消息ID只需要在一段时间内(比如1小时或1天内)当前会话和收件人的消息中唯一即可。比如在IM中,只需要在某个会话或者群里面唯一即可,这时候其实更好的方式是由客户端生成这个消息ID。最简单的方式是客户端循环使用0~10000之间的值和用户ID拼接后作为消息ID即可满足要求。
    • 如果不想自己生成消息ID,可以直接继承DistinctMessage,DistinctMessage类的消息ID = machineID + 进程ID + 循环递增ID[0, Integer.MAX_VALUE]。
  • void setMessageID(String messageID):
    • 在get和scan接口中设置消息ID。
  • IMessage newInstance():
    • 使用默认构造函数,生成一个同类型实例。用于在Store中自动生成同类型实例对象。
  • byte[] serialize():
    • 需要将消息内容完整序列号为字节数组。这个接口会被store接口在写入的时候调用。
  • void deserialize(byte[] input):
    • 反序列化接口,这个接口会被store在读取的时候调用。

生成Store

在一个IM或Feed流产品中,一般会有两个子系统,一个是存储系统,一个是同步系统。

需要为这两个系统各自生成一个Store对象。

  • 存储系统Store:存储数据时间长,数据量大,成本很重要。如果不使用读写扩散相结合的方式,那么存储系统的store可以使用成本更低的混合存储(SSD + SATA)。
  • 同步系统Store:存储受时间段,但是延时敏感,可以使用高性能存储(SSD),读取性能更稳定。
    如果首次使用,需要调用store的create接口创建相应存储表。

Timeline读写

Store生成好后就可以构造最终的Timeline对象了,Timeline对象分为两类,一类是存储库Timeline,一个是同步库Timeline。

当在IM中发布消息或者Feed流产品中发布状态时,就是对相应存储库Timeline和同步库Timeline的消息写入(store/storeAsync)。

当在IM或Feed流产品中读取最新消息时,就是对相应同步库Timeline的范围读取(scan)。

当在IM或Feed流产品中读取历史消息时,就是对相应存储库Timeline的范围读取(scan)。

如果是推拉结合的微博模式,则读取最新消息时,就是对相应存储库Timeline和同步库Timeline的同时范围读取(scan)。

错误处理

Timeline LIB中会抛出TimelineException,TimelineException提供了两种接口:getType()和getMessage(),getType()返回此TimelineException的类型,包括了TET_ABORT,TET_RETRY,TET_INVALID_USE,TET_UNKNOWN:

  • TET_ABORT:目前只有内部OutputStrem被非预期关闭后才会出现。
  • TET_RETRY:网络不稳定或者底层存储系统在负载均衡,可以继续做重试。
  • TET_INVALID_USE:使用方式不对,建议直接报错进程推出。
  • TET_UNKNOWN:未知错误,可以到Timeline@GitHub给我们提issue。

示例

这一节会演示下如何使用Timeline LIB实现IM的群组功能。

场景

  • 多人群组,号码是11789671。
  • 群里有用户:user_A,user_B,user_C。

初始化

构造两个store,一个用来存储,一个用例同步。

IStore store = new DistributeTimelineStore(storeConfig);
IStore sync = new DistributeTimelineStore(syncConfig);

// 构造群成员列表,群成员列表可以存储在Table Store。
List<String> groupMembers = Arrays.asList("user_A", "user_B", "user_C");

发消息

user_A发一条群消息:“有人吗”。

// 存储会话消息
Timeline timeline1 = new Timeline("11789671", store);
timeline1.store(new StringMessage("user_A:有人吗"));

// 发送同步消息
for (String user : groupMembers) {
    Timeline timeline = new Timeline(user, sync);
    timeline.store(new StringMessage("user_A:有人吗"));
}

读取同步消息

user_C读取自己最新的同步消息

Timeline timeline = new Timeline("user_C", sync);
ScanParameter scanParameter = ScanParameterBuilder
                .scanForward()
                .from(last_sequence_id)
                .to(Long.MAX_VALUE)
                .maxCount(100)
                .build();

Iterator<TimelineEntry> iterator = timeline.scan(scanParameter);
while(iterator.hasNext()) {
    TimelineEntry entry = iterator.next();
    // 处理消息
}

上面的示例演示了如何用Timeline LIB实现IM中的群组功能。其他的朋友圈,微博等也类似,这里就不赘述了。

我们目前在Timeline Samples@GitHub上实现两个场景的实例:

也欢迎大家共享其他场景的实现代码。

性能

我们使用阿里云ECS做了性能测试,效果较理想。

在阿里云共享型1核1G的ECS机器上,使用DistributeTimelineStore,Timeline LIB的storeAsync接口可以完成每秒1.2万消息的写入,如果使用batch批量接口,则可以完成每秒5.3万消息的写入。

如果使用一台8核的ECS,只需要3秒钟就可以完成100万条消息的写入。

由于DistributeTimelineStore使用了Table Store作为存储和同步系统,Table Store是阿里云的一款服务化NoSQL服务,支持的TPS在理论上无上限,实际中仅受限于集群大小,所以整个Timeline LIB的写入能力和压力器的CPU成正比。

下面的图展示了不同机型上完成1000万条消息写入的延迟:

在一台8核ECS上只需要27秒就可以完成1000万写入,由于写入能力和CPU成线线性关系,如果用两台16核的,则只需要7秒就可以完成1000万消息的写入。

我们再来看一下scan读取的性能,读取20条1KB长度消息,LIB端延迟一直稳定在3.4ms,Table Store服务端延迟稳定在2ms。

这个量级和能力,可以撑得住目前所有的IM和Feed流产品的压力。

和TableStore关系

Timeline LIB的想法来源于Table Store的真实场景需求,并且为了用户可以更加简单的使用,增加了主键列自增功能。

目前Timeline LIB的store层实现了DistributeTimelineStore类,DistributeTimelineStore可同时适用于存储store和同步store。

DistributeTimelineStore是基于Table Store的,但是为了便于用户使用其他系统,在Timeline LIB中将Store层独立了出来。

如果用户希望使用其他系统,比如MySQL作为存储系统,可以实现IStore接口构造自己的Store类。我们也欢迎大家提供自己的各种Store层实现,最终希望为社交场景的架构师和开发者提供一套完整的易用性开发框架。

接下来计划

  • Store的扩展:

    • 目前仅支持一种store:DistributeTimelineStore,虽然可以同时支持存储和同步,但是功能还是比较简单,接下来会支持更多的Store,比如全球多向同步的GlobalTimelineStore等。
  • 更易用性的接口
    • 目前Timeline接口比较偏向系统层,后续会提供更偏向业务层的接口,比如适用于IM的接口,适用于Feed流的接口等。

联系我们

如果在使用过程中有任何问题或者建议,可以通过下列途径联系我们:

时间: 2024-10-27 19:33:47

TableStore发布Timeline Lib:轻松构建千万级IM和Feed流系统的相关文章

如何在AWS云平台上构建千万级用户应用

如何在AWS云平台上构建千万级用户应用 方国伟 首席云技术顾问 --弹性的确是云计算的一个重要特性 --AWS云平台也供Auto Scaling功能来帮助用户实现弹性伸缩 --应用服务的弹性伸缩需要良好的设计 如何在AWS云平台上构建千万级用户应用

如何打造千万级Feed流系统

在互联网领域,尤其现在的移动互联网时代,Feed流产品是非常常见的,比如我们每天都会用到的朋友圈,微博,就是一种非常典型的Feed流产品,还有图片分享网站Pinterest,花瓣网等又是另一种形式的Feed流产品.除此之外,很多App的都会有一个模块,要么叫动态,要么叫消息广场,这些也是Feed流产品,可以说,Feed流产品是遍布天下所有的App中. 概念 我们在讲如何设计Feed流系统之前,先来看一下Feed流中的一些概念: Feed:Feed流中的每一条状态或者消息都是Feed,比如朋友圈中

如何打造千万级Feed流系统?阿里数据库技术解读

2017年的双十一又一次刷新了记录,交易创建峰值32.5万笔/秒.支付峰值25.6万笔/秒.而这样的交易和支付等记录,都会形成实时订单Feed数据流,汇入数据运营平台的主动服务系统中去.数据运营平台的主动服务,根据这些合并后的数据,实时的进行分析,进行实时的舆情展示,实时的找出需要主动服务的对象等,实现一个智能化的服务运营平台. 通过RDS PostgreSQL和HybridDB for PGSQL实时分析方案: 承受住了每秒几十万笔的写入吞吐并做数据清洗,是交易的数倍 实现分钟级延迟的实时分析

云上实践:轻松打造亿级用户的全球化高性能系统

导读 本文会介绍一个真实的全球性的中等规模的APP应用背后的技术选型与关键组件,涉及到高性能分布式系统.全球化网络布局.大数据平台与数据分析等关键技术. 厂家选择: 国内毫无疑问首选阿云,国外AWS当然是体量最大的,体验也不错,其实对ECS/EC2虚拟机.RDS数据库来说,基本功能.稳定性都相差无几,规模优势越来越明显的情况下,如果没有特殊考虑,基本木有考虑其他小厂的必要了. AWS/阿里云服务各有特色和短长,AWS发力早,国际技术社区/第三方市场更成熟,阿里云也有自己的特色的.很实用的功能如D

【技术干货】阿里云构建千万级别架构演变之路

本文作者:乔锐杰,现担任上海驻云信息科技有限公司运维总监/架构师.曾任职过黑客讲师.java软件工程师/网站架构师.高级运维.阿里云架构师等职位.维护过上千台服务器,主导过众安保险.新华社等千万级上云架构.在云端运维.分布式集群架构等方面有着丰富的经验. 前言     一个好的架构是靠演变而来,而不是单纯的靠设计.刚开始做架构设计,我们不可能全方位的考虑到架构的高性能.高扩展性.高安全等各方面的因素.随着业务需求越来越多.业务访问压力越来越大,架构不断的演变及进化,因而造就了一个成熟稳定的大型架

新浪微博千万级规模高性能、高并发的网络架构经验分享

[本文转载自新浪微博千万级规模高性能.高并发的网络架构经验分享] 架构以及我理解中架构的本质 在开始谈我对架构本质的理解之前,先谈谈对今天技术沙龙主题的个人见解,千万级规模的网站感觉数量级是非常大的,对这个数量级我们战略上要重视它,战术上又要藐视它. 先举个例子感受一下千万级到底是什么数量级?现在很流行的优步(Uber),从媒体公布的信息看,它每天接单量平均在百万左右, 假如每天有10个小时的服务时间,平均QPS只有30左右.对于一个后台服务器,单机的平均QPS可以到达800-1000,单独看写

千万级规模高性能、高并发的网络架构经验分享

千万级规模高性能.高并发的网络架构经验分享 主 题 :INTO100沙龙时间 :2015年11月21日下午地点 :梦想加联合办公空间分享人:卫向军(毕业于北京邮电大学,现任微博平台架构师,先后在微软.金山云.新浪微博从事技术研发工作,专注于系统架构设计.音视频通讯系统.分布式文件系统和数据挖掘等领域.) 架构以及我理解中架构的本质 在开始谈我对架构本质的理解之前,先谈谈对今天技术沙龙主题的个人见解,千万级规模的网站感觉数量级是非常大的,对这个数量级我们战略上 要重 视 它 , 战术上又 要 藐

千万级关键字奖杯排名百度前四的企业站优化方法

笔者下笔之前简介分析一下基本情况,企业站奖杯网是一个关于奖杯奖牌定做批发的网站,目前大词只有奖杯和奖牌两个,奖杯百度收录3100万,奖牌 2900万,GOOGLE奖杯收录1600万,奖牌收录1300万.奖杯网项目在去年国庆节过后正式启动,域名则选择了闲置二年的 www.jbjp.net,上线初期各在搜索引擎权重不高,页面只有个位数的收录,经过三个月的程序小修小改,目前企业站奖杯网百度收录1170 个,GOOGLE收录1680个. 奖杯这个千万级的关键字在百度排名前四,主要还是依靠很多SEOER推

探究千万级应用的推广与赢利之道

[CSDN综合整理]在本次2011中国移动开发者大会的"推广与盈利"的主题论坛上,一场由国内一线移动开发商代表参加的圆桌论坛吸引到了众多眼球,嘉宾们围绕着移动应用的营销手段.运营方式.效果的持续跟踪和分析反馈等话题展开了充分探讨. 主持人:友盟副总裁蒋桦 嘉宾:3G门户副总裁曹明(Go桌面),北京掌中浩阅科技有限公司创始人兼总裁张凌云(iReader),北京京鼎诚科技发展有限责任公司创始人兼CEO苏光升(OPDA社区/Android优化大师),网易移动互联网中心总经理徐诗,上海水渡石信