PostgreSQL 异步消息实践 - 亿级/分钟 FEED系统实时监测

标签

PostgreSQL , 异步消息 , 触发器 , 规则 , insert on conflict , 实时分析


背景

在很多业务系统中,为了定位问题、运营需要、分析需要或者其他需求,会在业务中设置埋点,记录用户的行为在业务系统中产生的日志,也叫FEED日志。

比如订单系统、在业务系统中环环相扣,从购物车、下单、付款、发货,收货(还有纠纷、退款等等),一笔订单通常会产生若干相关联的记录。

每个环节产生的属性可能是不一样的,有可能有新的属性产生,也有可能变更已有的属性值。

为了便于分析,通常有必要将订单在整个过程中产生的若干记录(若干属性),合并成一条记录(订单大宽表)。

通常业务系统会将实时产生的订单FEED数据写入消息队列,消息队列使得数据变成了流动的数据:

《从人类河流文明 洞察 数据流动的重要性》

RDS PG + OSS + HDB PG 分钟清洗和主动检测

数据通过消息队列消费后,实时写入RDS PG,在RDS PG进行订单FEED的合并,写入OSS外部表。(支持压缩格式,换算成裸数据的写入OSS的速度约100MB/s/会话)

HDB PG从OSS外部表读取(支持压缩格式,换算成裸数据的读取OSS的速度约100MB/s/数据节点),并将订单FEED数据合并到全量订单表。

《打造云端流计算、在线业务、数据分析的业务数据闭环 - 阿里云RDS、HybridDB for PostgreSQL最佳实践》

数据进入HDB PG后,通过规则SQL,从全量订单表中,挖掘异常数据(或者分析)。

通过这种方案,实现了海量订单FEED数据的分钟级准实时分析。

这个方案已支撑了双十一业务,高吞吐、低延迟,丝般柔滑。

毫秒级FEED监测与反馈方案

技术永远是为业务服务的,分钟级延迟虽然说已经很高了,但是在一些极端情况下,可能需要更低的延迟。

实际上RDS PostgreSQL还有更强的杀手锏,可以实现毫秒级的异常FEED数据发现和反馈。

流式处理+异步消息,方法如下:

1、通过触发机制结合异步消息通道实现。

2、通过pipeline,流式SQL结合异步消息通道实现。

应用程序监听消息通道(listen channel),数据库则将异常数据写入到消息通道(notify channel, message)。实现异常数据的主动异步推送。

毫秒级FEED监测与反馈架构设计

RDS PG设计

1、分实例,提高系统级吞吐。(例如单实例处理能力是15万行/s,那么100个实例,可以支撑1500万行/s的实时处理。)

例如:

DB0, DB1, DB2, DB3, ..., DB255

映射关系:

db0, host?, port?  

db1, host?, port?  

...

2、实例内使用分表,提高单实例并行处理吞吐。当规则众多时,分表可以提高单实例的规则处理吞吐。

例如

tbl0, tbl1, tbl2, ..., tbl127  

tbl128, tbl129, tbl130, ..., tbl255

映射关系:

tbl0, db?  

tbl1, db?  

...

HDB PG设计

HDB PG依旧保留,用于PB级数据量的海量数据实时分析。

数据通路依旧采用OSS,批量导入的方式。

DEMO

1、创建订单feed全宽表(当然,我们也可以使用jsonb字段来存储所有属性。因为PostgreSQL支持JSONB类型哦。PostgreSQL支持的多值类型还有hstore, xml等。)

create table feed(id int8 primary key, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int);

2、订单FEED数据的写入,例如A业务系统,写入订单的c1,c2字段。B业务系统,写入订单的c3,c4字段。......

使用on conflict do something语法,进行订单属性的合并。

insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;  

insert into feed (id, c3, c4) values (2,99,290001) on conflict (id) do update set c3=excluded.c3, c4=excluded.c4 ;

3、建立订单FEED的实时监测规则,当满足条件时,向PostgreSQL的异步消息中发送消息。监听该通道的APP,循环从异步消息获取数据,即可满足消息的实时消费。

规则可以保留在TABLE中,也可以写在触发器代码中,也可以写在UDF代码中。

3.1、如果数据是批量写入的,可以使用语句级触发器,降低触发器函数被调用的次数,提高写入吞吐。

create or replace function tg1() returns trigger as $$
declare
begin
  -- 规则定义,实际使用时,可以联合规则定义表
  -- c2大于1000时,发送异步消息
  perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(inserted)) from inserted where c2>1000;    

  -- 多个规则,写单个notify的方法。
  --   perform pg_notify(
  --                    'channel_1',
  --		       case
  --		        when c2>1000 then 'Resone:c2 overflow::'||row_to_json(inserted)
  --		        when c1>200 then 'Resone:c1 overflow::'||row_to_json(inserted)
  --		       end
  --		      )
  --   from inserted
  --   where
  --     c2 > 1000
  --	 or c1 > 200;    

  -- 多个规则,可以写多个notify,或者合并成一个NOTIFY。  

  return null;
end;
$$ language plpgsql strict;

3.2、如果数据是单条写入的,可以使用行级触发器。(本例后面的压测使用这个)

create or replace function tg2() returns trigger as $$
declare
begin
  -- 规则定义,实际使用时,可以联合规则定义表  

  -- c2大于9999时,发送异步消息
  perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;    

  -- 多个规则,调用单个notify,写一个CHANNEL的方法。
  --   perform pg_notify(
  --                    'channel_1',
  --		       case
  --		        when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
  --		        when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
  --		       end
  --		      )
  --   where
  --     NEW.c2 > 10000
  --	 or NEW.c1 > 200;    

  -- 多个规则,调用单个notify,写多个CHANNEL的方法。
  --   perform pg_notify(
  --		       case
  --		        when c2>1000 then 'channel_1'
  --		        when c1>200 then 'channel_2'
  --		       end,
  --		       case
  --		        when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
  --		        when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
  --		       end
  --		      )
  --   where
  --     NEW.c2 > 1000
  --	 or NEW.c1 > 200;    

  -- 多个规则,可以写多个notify,或者合并成一个NOTIFY。
  -- 例如
  -- perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2 > 1000;
  -- perform pg_notify('channel_2', 'Resone:c1 overflow::'||row_to_json(NEW)) where NEW.c1 > 200;  

  -- 也可以把规则定义在TABLE里面,实现动态的规则
  -- 规则不要过于冗长,否则会降低写入的吞吐,因为是串行处理规则。
  -- udf的输入为feed类型以及rule_table类型,输出为boolean。判断逻辑定义在UDF中。
  -- perfrom pg_notify(channel_column, resone_column||'::'||row_to_json(NEW)) from rule_table where udf(NEW::feed, rule_table);  

  return null;
end;
$$ language plpgsql strict;

3.3、如上代码中所述,规则可以定义在很多地方。

4、创建触发器。

4.1、语句级触发器(批量写入,建议采用)

create trigger tg1 after insert on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();
create trigger tg2 after update on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();

4.2、行级触发器(单步写入建议采用),(本例后面的压测使用这个)

create trigger tg1 after insert on feed for each row execute procedure tg2();
create trigger tg2 after update on feed for each row execute procedure tg2();

5、协商好通道名称。

6、应用端监听消息通道。

listen channel_1;  

接收消息:  

loop
  sleep ?;
  get 消息;
end loop

7、写入订单数据,每行数据都会实时过触发器,在触发器中写好了逻辑,当满足一些规则时,向协商好的消息通道发送消息。

postgres=# insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
INSERT 0 1

8、接收到的消息样本如下:

Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2,"c1":2,"c2":30001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.

9、批量插入

postgres=# insert into feed (id, c1, c2)  select id,random()*100, random()*1001 from generate_series(1,10000) t(id) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
INSERT 0 10000
Time: 59.528 ms

一次接收到的样本如下:

Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":362,"c1":92,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4061,"c1":90,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4396,"c1":89,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5485,"c1":72,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6027,"c1":56,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6052,"c1":91,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7893,"c1":84,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8158,"c1":73,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.

10、更新数据

postgres=# update feed set c1=1;
UPDATE 10000
Time: 33.444 ms

接收到的异步消息样本如下:

Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":1928,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2492,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2940,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2981,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4271,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4539,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7089,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7619,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8001,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8511,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8774,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":9394,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.

压测

1、假设每1万条记录中,有一条异常记录需要推送,这样的频率算是比较现实的。

vi test.sql  

\set id random(1,10000000)
\set c1 random(1,1001)
\set c2 random(1,10000)
insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;

2、压测结果,167190 行/s处理吞吐。

transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 56
number of threads: 56
duration: 120 s
number of transactions actually processed: 20060111
latency average = 0.335 ms
latency stddev = 0.173 ms
tps = 167148.009836 (including connections establishing)
tps = 167190.475312 (excluding connections establishing)
script statistics:
 - statement latencies in milliseconds:
         0.002  \set id random(1,10000000)
         0.001  \set c1 random(1,1001)
         0.000  \set c2 random(1,10000)
         0.332  insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;

3、监听到的异步消息采样

postgres=# listen channel_1;
LISTEN
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3027121,"c1":393,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 738.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5623104,"c1":177,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 758.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3850742,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5244809,"c1":55,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 716.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4062585,"c1":380,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 722.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8536437,"c1":560,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7327211,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 728.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":431739,"c1":824,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 731.

单实例分表的schemaless设计

请参考如下用法或案例,目的是自动建表,自动分片。

《PostgreSQL 在铁老大订单系统中的schemaless设计和性能压测》

《PostgreSQL 按需切片的实现(TimescaleDB插件自动切片功能的plpgsql schemaless实现)》

《PostgreSQL schemaless 的实现》

《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》

jdbc 异步消息使用例子

https://jdbc.postgresql.org/documentation/81/listennotify.html

import java.sql.*;  

public class NotificationTest {  

	public static void main(String args[]) throws Exception {
		Class.forName("org.postgresql.Driver");
		String url = "jdbc:postgresql://localhost:5432/test";  

		// Create two distinct connections, one for the notifier
		// and another for the listener to show the communication
		// works across connections although this example would
		// work fine with just one connection.
		Connection lConn = DriverManager.getConnection(url,"test","");
		Connection nConn = DriverManager.getConnection(url,"test","");  

		// Create two threads, one to issue notifications and
		// the other to receive them.
		Listener listener = new Listener(lConn);
		Notifier notifier = new Notifier(nConn);
		listener.start();
		notifier.start();
	}  

}  

class Listener extends Thread {  

	private Connection conn;
	private org.postgresql.PGConnection pgconn;  

	Listener(Connection conn) throws SQLException {
		this.conn = conn;
		this.pgconn = (org.postgresql.PGConnection)conn;
		Statement stmt = conn.createStatement();
		stmt.execute("LISTEN mymessage");
		stmt.close();
	}  

	public void run() {
		while (true) {
			try {
				// issue a dummy query to contact the backend
				// and receive any pending notifications.
				Statement stmt = conn.createStatement();
				ResultSet rs = stmt.executeQuery("SELECT 1");
				rs.close();
				stmt.close();  

				org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
				if (notifications != null) {
					for (int i=0; i<notifications.length; i++) {
						System.out.println("Got notification: " + notifications[i].getName());
					}
				}  

				// wait a while before checking again for new
				// notifications
				Thread.sleep(500);
			} catch (SQLException sqle) {
				sqle.printStackTrace();
			} catch (InterruptedException ie) {
				ie.printStackTrace();
			}
		}
	}  

}  

class Notifier extends Thread {  

	private Connection conn;  

	public Notifier(Connection conn) {
		this.conn = conn;
	}  

	public void run() {
		while (true) {
			try {
				Statement stmt = conn.createStatement();
				stmt.execute("NOTIFY mymessage");
				stmt.close();
				Thread.sleep(2000);
			} catch (SQLException sqle) {
				sqle.printStackTrace();
			} catch (InterruptedException ie) {
				ie.printStackTrace();
			}
		}
	}  

}

libpq 异步消息的使用方法

https://www.postgresql.org/docs/10/static/libpq-notify.html

触发器的用法

https://www.postgresql.org/docs/10/static/sql-createtrigger.html

《PostgreSQL 触发器 用法详解 1》

《PostgreSQL 触发器 用法详解 2》

注意事项

1、异步消息快速接收,否则会占用实例 $PGDATA/pg_notify 的目录空间。

2、异步消息上限,没有上限,和存储有个。

buffer大小:

/*
 * The number of SLRU page buffers we use for the notification queue.
 */
#define NUM_ASYNC_BUFFERS       8

3、异步消息可靠性,每个异步消息通道,PG都会跟踪监听这个通道的会话已接收到的消息的位置偏移。

新发起的监听,只从监听时该通道的最后偏移开始发送,该偏移之前的消息不会被发送。

消息接收后,如果没有任何监听需要,则会被清除。

监听消息通道的会话,需要持久化,也就是说会话断开的话,(未接收的消息,以及到会话重新监听这段时间,新产生的消息,都收不到)

4、如果需要强可靠性(替换掉异步消息,使用持久化的模式)

方法:触发器内pg_notify改成insert into feedback_table ....;

持久化消息的消费方法,改成如下(阅后即焚模式):

with t1 as (select ctid from feedback_table order by crt_time limit 100)
  delete from feedback_table where
    ctid = any (array(select ctid from t1))
    returning *;

持久化消息,一样能满足10万行以上的消费能力(通常异常消息不会那么多,所以这里可以考虑使用单个异常表,多个订单表)。

只不过会消耗更多的RDS PG的IOPS,(产生写 WAL,VACUUM WAL。)

其他

1、已推送的异常,当数据更新后,可能会被再次触发,通过在逻辑中对比OLD value和NEW value可以来规避这个问题。本文未涉及。实际使用是可以改写触发器代码。

参考

《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》

《(流式、lambda、触发器)实时处理大比拼 - 物联网(IoT)\金融,时序处理最佳实践》

《PostgreSQL 10.0 preview 功能增强 - 触发器函数内置中间表》

https://www.postgresql.org/docs/10/static/sql-createtrigger.html

https://jdbc.postgresql.org/documentation/81/listennotify.html

https://www.postgresql.org/docs/10/static/libpq-notify.html

《(流式、lambda、触发器)实时处理大比拼 - 物联网(IoT)\金融,时序处理最佳实践》

时间: 2024-11-05 14:55:35

PostgreSQL 异步消息实践 - 亿级/分钟 FEED系统实时监测的相关文章

阿里云RDS PG实践 - 流式标签 - 万亿级,实时任意标签圈人

标签 PostgreSQL , 阅后即焚 , 流计算 , 标签 背景 varbitx是阿里云RDS PG提供的一个BIT操作插件,使用这个插件已经成功的帮助用户提供了万亿级的毫秒级实时圈人功能. <阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍> <基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)> 结合阅后即焚的流式批量处理,schemaless UDF,可以实现高效的增.删标签,以及毫秒级别的按标签圈人

PgSQL · 应用案例 · 流式计算与异步消息在阿里实时订单监测中的应用

背景 在很多业务系统中,为了定位问题.运营需要.分析需要或者其他需求,会在业务中设置埋点,记录用户的行为在业务系统中产生的日志,也叫FEED日志. 比如订单系统.在业务系统中环环相扣,从购物车.下单.付款.发货,收货(还有纠纷.退款等等),一笔订单通常会产生若干相关联的记录. 每个环节产生的属性可能是不一样的,有可能有新的属性产生,也有可能变更已有的属性值. 为了便于分析,通常有必要将订单在整个过程中产生的若干记录(若干属性),合并成一条记录(订单大宽表). 通常业务系统会将实时产生的订单FEE

实时音视频通讯服务实现亿级场景化

移动互联网发展迅猛,目前实时音视频技术已被广泛地应用在了实时在线教育.智能家居.在线直播.安防监控等领域.这之中,诸如多人视频会议.在线实时视频教育等场景,跟传统的一对一实时音视频聊天,在技术架构的实现上有很大不同. 2016云栖大会首日,有信CTO为我们带来了关于如何实现亿级场景化的实时音频通讯服务的相关分享.他首先简单介绍了新通讯市场的独角兽Twilio,接着,给出了O2O生活服务产业结构图,从交易闭环.服务闭环.客户服务等方面进行分析,提炼出O2O行业目前正面临的难题:商机流转不受控制.通

亿级 Web 系统的容错性建设实践

[本文转载于亿级 Web 系统的容错性建设实践] 三年多前,我在腾讯负责的活动运营系统,因为业务流量规模的数倍增长,系统出现了各种各样的异常,当时,作为开发的我,7*24小时地没日没夜处理告警,周末和凌晨也经常上线,疲于奔命.后来,当时的老领导对我说:你不能总扮演一个"救火队长"的角色, 要尝试从系统整体层面思考产生问题的根本原因,然后推进解决. 我幡然醒悟,"火"是永远救不完的,让系统能够自动"灭火",才是解决问题的正确方向.简而言之,系统的异

少年,敢来挑战双11万亿级消息引擎吗?

世界级的挑战舞台, 业界顶级评委阵容, 无以伦比的成长挑战, 30万丰厚大奖和西行游学, 统统准备就绪, 只待有志之士! 第三届阿里中间件性能挑战赛正式启动! 勇敢的少年,你敢来"挑战双11万亿级消息引擎"吗? 阿里中间件性能挑战赛是什么? 该大赛由阿里巴巴集团主办,阿里中间件与阿里云天池联合承办,为选手们提供最真实的阿里电商业务中最具挑战的双11实时交易模拟场景,以及阿里中间件多项开源产品,并由阿里云全程提供云计算资源. 世界级的挑战舞台 2015第一届阿里中间件性能挑战赛面向高校人

数据库、架构、移动、机器学习,蚂蚁金服与阿里云技术布道亿级互联网金融实践

随着云计算.移动.活的大数据.机器学习算法的进展,人工智能正在经历巨大的突破.人工智能已经成为很多业务的驱动力,并且开始在金融服务中发挥力量. 蚂蚁金服致力于创新金融技术,并且用新技术为大众和小微企业提供普惠金融服务. 8月30-31日,一场别开生面的技术大会-"蚂蚁金服&阿里云在线金融技术峰会"( https://yq.aliyun.com/activity/109)将在线举办.聚焦数据库.应用架构.移动开发.机器学习等热门领域,为金融业技术开发者深入解析互联网应用的前沿应用

蚂蚁金服资深技术专家石世群:支付宝亿级APP的性能稳定性优化及运维实践

8月30-31日20:00-21:30,一场别开生面的技术大会-- "蚂蚁金服&阿里云在线金融技术峰会"将在线举办.本次将聚焦数据库.应用架构.移动开发.机器学习等热门领域,帮助金融业技术开发者深入解析互联网应用的前沿应用与技术实践. 蚂蚁金服&阿里云在线金融技术峰会专题:https://yq.aliyun.com/activity/109 峰会统一报名链接:http://yq.aliyun.com/webinar/join/38 来自蚂蚁金服的资深技术专家石世群 ,将

蚂蚁金服的资深技术专家石世群:支付宝亿级APP的性能稳定性优化及运维实践

8月30-31日20:00-21:30,一场别开生面的技术大会-- "蚂蚁金服&阿里云在线金融技术峰会"将在线举办.本次将聚焦数据库.应用架构.移动开发.机器学习等热门领域,帮助金融业技术开发者深入解析互联网应用的前沿应用与技术实践. 蚂蚁金服&阿里云在线金融技术峰会专题:https://yq.aliyun.com/activity/109 峰会统一报名链接:http://yq.aliyun.com/webinar/join/38 来自蚂蚁金服的资深技术专家石世群 ,将

亿级下ApsaraDB HBase Phoenix秒级内RT在大搜车实践

一.前言 大搜车业务线众多,对于数据的需求也各种各样,本文将介绍其中之一的大搜车车商客户实时数据需求,例如车商PC|H5端店铺.车辆.分享等实时流量数据报表:随着数据量级的增长,目前数据量级在亿级以上,原有以mysql提供查询服务不再适合此场景,经过多方面的考虑,存储最终选择Aliyun HBase,同时为了几乎0成本的切换,采用Phoenix On HBase Sql中间件,它管理着HBase的二级索引并且它对sql的支持友好,本文也将介绍Phoenix和HBase结合场景下的压力测试. 二.