超时流式处理 - 没有消息流入的数据异常监控

标签

PostgreSQL , 流式处理 , 无流入数据超时异常


背景

流计算有个特点,数据流式写入,流式计算。

但是有一种情况,可能无法覆盖。例如电商中的 收货超时,退款处理超时 事件的流式监控。因为数据都不会再写进来了,所以也无法触发流式计算。

这些问题如何流式预警呢?

可以用超时时间+调度的方式,当然这里面有PostgreSQL的独门秘籍:

1、CTE,语法灵活。

2、partial index,不需要检索的数据不构建索引。

3、DML returning,可以返回DML语句的结果,结合CTE实现最小交互计算。

4、multi-index bitmap scan,多个索引合并扫描,在使用OR条件时,可以结合多个字段的索引进行合并扫描。

DEMO设计

1、被监控表的结构。里面记录了订单、退款等事件的超时处理时间,超时通知次数,下一次通知时间间隔,完结状态等。

create table tbl (
  id int8,
  --                               ..... 其他字段 (比如已完结状态)
  state int,                       -- 完结状态(1 表示已完结)
  deadts timestamp,                -- 超时时间
  nts interval,                    -- 超时间隔,用于更新下一次通知时间 (比如一天通知一次)
  notify_times int default 0,      -- 通知次数
  deadts_next timestamp            -- 下一次通知时间
);

2、创建partial index,也就是说,对未完结工单才需要通知用户,这些数据是业务关心的,使用partial index可以简化索引大小。提高速度。

create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1;  

create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;

3、获取需要通知的数据,并且更新通知次数以及下一次的通知时间。

with tmp1 as (
update tbl set
  deadts_next=now()+nts,
  notify_times=notify_times+1
where ctid = any (array(
  select ctid from tbl where
  ( deadts < now() and notify_times=0 and state<>1)
  union all
  select ctid from tbl where
  ( deadts_next < now() and deadts_next is not null and state<>1)
  limit 10000     -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;

4、执行计划完美

 CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48)
   CTE tmp1
     ->  Update on tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54)
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.13..18151.03 rows=10000 width=6)
                   ->  Append  (cost=0.13..764699.60 rows=421301 width=6)
                         ->  Index Scan using idx_tbl_1 on tbl  (cost=0.13..169527.13 rows=369766 width=6)
                               Index Cond: (deadts < now())
                         ->  Index Scan using idx_tbl_2 on tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6)
                               Index Cond: (deadts_next < now())
           ->  Tid Scan on tbl tbl_2  (cost=0.01..12.21 rows=10 width=54)
                 TID Cond: (ctid = ANY ($0))
(12 rows)

5、调度

《PostgreSQL 定时任务方法2》

《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》

当然你如果使用阿里云,可以使用阿里云的调度平台,配置调度任务。

性能指标

1、写入1亿数据,假设有100万条同时超时需要处理,耗时如何?

-- 1亿条完结
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);  

-- 100万条超时
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);

通知性能,比如每一批通知1万条:

(小批量获取,并更新超时时间,目的是让autovacuum介入,实时回收垃圾)

with tmp1 as (
update tbl set
  deadts_next=now()+nts,
  notify_times=notify_times+1
where ctid = any (array(
  select ctid from tbl where
  ( deadts < now() and notify_times=0 and state<>1)
  union all
  select ctid from tbl where
  ( deadts_next < now() and deadts_next is not null and state<>1)
  limit 10000     -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;  

-- 计划  

 CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)
   Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next
   Buffers: shared hit=75094 read=49 dirtied=49
   CTE tmp1
     ->  Update on public.tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)
           Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next
           Buffers: shared hit=75094 read=49 dirtied=49
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)
                   Output: tbl.ctid
                   Buffers: shared hit=11395
                   ->  Append  (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1)
                         Buffers: shared hit=11395
                         ->  Index Scan using idx_tbl_1 on public.tbl  (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)
                               Output: tbl.ctid
                               Index Cond: (tbl.deadts < now())
                               Buffers: shared hit=1
                         ->  Index Scan using idx_tbl_2 on public.tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)
                               Output: tbl_1.ctid
                               Index Cond: (tbl_1.deadts_next < now())
                               Buffers: shared hit=11394
           ->  Tid Scan on public.tbl tbl_2  (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)
                 Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid
                 TID Cond: (tbl_2.ctid = ANY ($0))
                 Buffers: shared hit=21395
 Planning time: 0.301 ms
 Execution time: 79.905 ms

丝般柔滑

Time: 79.905 ms

小结

使用以上方法,可以完美的解决超时数据的监控问题。性能好。

时间: 2024-11-29 23:01:33

超时流式处理 - 没有消息流入的数据异常监控的相关文章

PgSQL · 应用案例 · 流式计算与异步消息在阿里实时订单监测中的应用

背景 在很多业务系统中,为了定位问题.运营需要.分析需要或者其他需求,会在业务中设置埋点,记录用户的行为在业务系统中产生的日志,也叫FEED日志. 比如订单系统.在业务系统中环环相扣,从购物车.下单.付款.发货,收货(还有纠纷.退款等等),一笔订单通常会产生若干相关联的记录. 每个环节产生的属性可能是不一样的,有可能有新的属性产生,也有可能变更已有的属性值. 为了便于分析,通常有必要将订单在整个过程中产生的若干记录(若干属性),合并成一条记录(订单大宽表). 通常业务系统会将实时产生的订单FEE

In-Stream Big Data Processing译文:流式大数据处理

转自:http://blog.csdn.net/idontwantobe/article/details/25938511  @猪头饼 原文:http://highlyscalable.wordpress.com/2013/08/20/in-stream-big-data-processing/ 作者:Ilya Katsov 相当长一段时间以来,大数据社区已经普遍认识到了批量数据处理的不足.很多应用都对实时查询和流式处理产生了迫切需求.最近几年,在这个理念的推动下,催生出了一系列解决方案,Twi

如何保障流式处理的数据一致性

背景 相对于传统的Hadoop这样的batch分析平台,流式分析的优点就是实时性, 即可以在秒级别延迟上得到分析结果 .  当然缺点是, 很难保证强一致性,即Exactly-Once语义 (在海量数据的前提下,为了保障吞吐量,无法使用类似事务的强一致性的方案).  一般流式分析平台都会promise较弱的一致性,即Least-Once语义,保证数据不丢但允许数据重复. 但这只是在正常的情况下,当流式分析的任一环节发生故障,整个流被堵塞时,会导致层层队列被打满,最终仍然是会丢数据的. 所以对于流式

数据天生就是流式的

题外话 好久没写文章,发现写长文太辛苦了,所以慢慢往短文开始靠.这次算是第一个实践. 完全由流式计算构建的体系 部门目前核心其实就是流式计算,从根部开始(一个超大的Kafka集群)开始,延伸出一个超级庞大的树形结构.整个过程都是数据自我驱动进行流转,没有使用类似Azkaban/Oozie 等外部工具去让数据从一个系统流转到另外一个系统. 而我之前提出 Transformer架构 本质就是一个流式数据架构.  这个架构的核心概念是: 你开发的任何一个应用,本质上都是将两个或者多个节点连接起来,从而

千亿特征流式学习在大规模推荐排序场景的应用

摘要:2017云栖大会机器学习平台PAI专场,阿里巴巴高级技术专家陈绪带来千亿特征流式学习在大规模推荐排序场景的应用的演讲.主要从电商个性化推荐开始谈起,进而描述了技术挑战和PAI解决方案,重点分享了鲲鹏框架和算法调优,最好作了简要总结. 以下是精彩内容整理: 电商个性化推荐 淘宝.天猫在无线.PC端各个场景的商品个性化推荐大家都很熟悉,这些展示都是由个性化推荐排序算法决定的.根据每个用户不同的兴趣,做到千人千面的个性化展示,比如手淘首页的猜你喜欢,它是阿里电商最大的推荐场景,还有人群导购.看了

从Storm和Spark 学习流式实时分布式计算的设计

0. 背景 最近我在做流式实时分布式计算系统的架构设计,而正好又要参加CSDN博文大赛的决赛.本来想就写Spark源码分析的文章吧.但是又想毕竟是决赛,要拿出一些自己的干货出来,仅仅是源码分析貌似分量不够.因此,我将最近一直在做的系统架构的思路整理出来,形成此文.为什么要参考Storm和Spark,因为没有参照效果可能不会太好,尤其是对于Storm和Spark由了解的同学来说,可能通过对比,更能体会到每个具体实现背后的意义. 本文对流式系统出现的背景,特点,数据HA,服务HA,节点间和计算逻辑间

【译】使用Apache Kafka构建流式数据平台(1)

前言:前段时间接触过一个流式计算的任务,使用了阿里巴巴集团的JStorm,发现这个领域值得探索,就发现了这篇文章--Putting Apache Kafka To Use: A Practical Guide to Building a Stream Data Platform(Part 1).在读的过程中半总结半翻译,形成本文,跟大家分享. 最近你可能听说很多技术名词,例如"流式处理"."事件数据"以及"实时"等,与之相关的技术有Kafka.S

Spark Streaming 流式计算实战

这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容.  业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成 userName/year/month/day/hh/normal  userName/year/month/day/hh/delay 路径,存储到HDFS中.如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 no

流式计算的系统设计和实现

阿里云数据事业部强琦为大家带来题为"流式计算的系统设计与实现"的演讲,本文主要从增量计算和流式计算开始谈起,然后讲解了与批量计算的区别,重点对典型系统技术概要进行了分析,包括Storm.Kinesis.MillWheel,接着介绍了核心技术.消息机制以及StreamSQL等,一起来了解下吧.   增量计算和流式计算 流式计算 流计算对于时效性要求比较严格,实时计算就是对计算的时效性要求比较强.流计算是利用分布式的思想和方法,对海量"流"式数据进行实时处理的系统,它源