日志服务(SLS) - 通过 Flink 消费Loghub日志

Flink log connector

介绍

Flink log connector是阿里云日志服务提供的,用于对接flink的工具,包括两部分,消费者(Consumer)和生产者(Producer)。

消费者用于从日志服务中读取数据,支持exactly once语义,支持shard负载均衡.
生产者用于将数据写入日志服务,使用connector时,需要在项目中添加maven依赖:

<dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>flink-log-connector</artifactId>
            <version>0.1.2</version>
</dependency>
<dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
</dependency>
 <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log</artifactId>
            <version>0.6.10</version>
 </dependency>
<dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>log-loghub-producer</artifactId>
            <version>0.1.8</version>
</dependency>

代码:Github

用法

  1. 请参考日志服务文档,正确创建Logstore。
  2. 如果使用子账号访问,请确认正确设置了LogStore的RAM策略。参考授权RAM子用户访问日志服务资源。

1. Log Consumer

在Connector中, 类FlinkLogConsumer提供了订阅日志服务中某一个LogStore的能力,实现了exactly once语义,在使用时,用户无需关心LogStore中shard数
量的变化,consumer会自动感知。

flink中每一个子任务负责消费LogStore中部分shard,如果LogStore中shard发生split或者merge,子任务消费的shard也会随之改变。

1.1 配置启动参数

Properties configProps = new Properties();
// 设置访问日志服务的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 设置访问ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 设置日志服务的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 设置日志服务的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 设置消费日志服务起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 设置日志服务的消息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(
        new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));

上面是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任务数量和日志服务LogStore中的shard数量是独立的,如果shard数量多于子任务数量,每个子任务不重复的消费多个shard,如果少于,

那么部分子任务就会空闲,等到新的shard产生。

1.2 设置消费起始位置

Flink log consumer支持设置shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消费从shard的头尾或者某个特定时间开始消费,具体取值如下:

  • Consts.LOG_BEGIN_CURSOR: 表示从shard的头开始消费,也就是从shard中最旧的数据开始消费。
  • Consts.LOG_END_CURSOR: 表示从shard的尾开始,也就是从shard中最新的数据开始消费。
  • UnixTimestamp: 一个整型数值的字符串,用1970-01-01到现在的秒数表示, 含义是消费shard中这个时间点之后的数据。

三种取值举例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");

1.3 监控:消费进度(可选)

Flink log consumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细概念可以参考
文档消费组-查看状态,[消费组-监控报警
](https://help.aliyun.com/document_detail/55912.html)。

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);

注意上面代码是可选的,如果设置了,consumer会首先创建consumerGroup,如果已经存在,则什么都不做,consumer中的snapshot会自动同步到日志服务的consumerGroup中,用户可以在日志服务的控制台查看consumer的消费进度。

1.4 容灾和exactly once语义支持

当打开Flink的checkpointing功能时,Flink log consumer会周期性的将每个shard的消费进度保存起来,当作业失败时,flink会恢复log consumer,并
从保存的最新的checkpoint开始消费。

写checkpoint的周期定义了当发生失败时,最多多少的数据会被回溯,也就是重新消费,使用代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启flink exactly once语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);

更多Flink checkpoint的细节请参考Flink官方文档Checkpoints

1.5 补充材料:关联 API与权限设置

Flink log consumer 会用到的阿里云日志服务接口如下:

  • GetCursorOrData

    用于从shard中拉数据, 注意频繁的调用该接口可能会导致数据超过日志服务的shard quota, 可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
    控制接口调用的时间间隔和每次调用拉取的日志数量,shard的quota参考文章[shard简介](https://help.aliyun.com/document_detail/28976.html).
    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
  • ListShards
     用于获取logStore中所有的shard列表,获取shard状态等.如果您的shard经常发生分裂合并,可以通过调整接口的调用周期来及时发现shard的变化。
    // 设置每30s调用一次ListShards
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
  • CreateConsumerGroup
    该接口调用只有当设置消费进度监控时才会发生,功能是创建consumerGroup,用于同步checkpoint。
  • ConsumerGroupUpdateCheckPoint
    该接口用户将flink的snapshot同步到日志服务的consumerGroup中。
    

子用户使用Flink log consumer需要授权如下几个RAM Policy:

接口 资源
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

2. Log Producer

FlinkLogProducer 用于将数据写到阿里云日志服务中。

注意producer只支持Flink at-least-once语义,这就意味着在发生作业失败的情况下,写入日志服务中的数据有可能会重复,但是绝对不会丢失。

用法示例如下,我们将模拟产生的字符串写入日志服务:

// 将数据序列化成日志服务的数据格式
class SimpleLogSerializer implements LogSerializationSchema<String> {

    public RawLogGroup serialize(String element) {
        RawLogGroup rlg = new RawLogGroup();
        RawLog rl = new RawLog();
        rl.setTime((int)(System.currentTimeMillis() / 1000));
        rl.addContent("message", element);
        rlg.addLog(rl);
        return rlg;
    }
}
public class ProducerSample {
    public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    public static String sAccessKeyId = "";
    public static String sAccessKey = "";
    public static String sProject = "ali-cn-hangzhou-sls-admin";
    public static String sLogstore = "test-flink-producer";
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);

        DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());

        Properties configProps = new Properties();
        // 设置访问日志服务的域名
        configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
        // 设置访问日志服务的ak
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
        // 设置日志写入的日志服务project
        configProps.put(ConfigConstants.LOG_PROJECT, sProject);
        // 设置日志写入的日志服务logStore
        configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);

        FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);

        simpleStringStream.addSink(logProducer);

        env.execute("flink log producer");
    }
    // 模拟产生日志
    public static class EventsGenerator implements SourceFunction<String> {
        private boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

2.1 初始化

Producer初始化主要需要做两件事情:

  • 初始化配置参数Properties, 这一步和Consumer类似, Producer有一些定制的参数,一般情况下使用默认值即可,特殊场景可以考虑定制:

    // 用于发送数据的io线程的数量,默认是8
    ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
    // 该值定义日志数据被缓存发送的时间,默认是3000
    ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
    // 缓存发送的包中日志的数量,默认是4096
    ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
    // 缓存发送的包的大小,默认是3Mb
    ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
    // 作业可以使用的内存总的大小,默认是100Mb
    ConfigConstants.LOG_MEM_POOL_BYTES
    上述参数不是必选参数,用户可以不设置,直接使用默认值。
  • 重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。
    RawLogGroup是log的集合,每个字段的含义可以参考文档[日志数据模型](https://help.aliyun.com/document_detail/29054.html)。
    

如果用户需要使用日志服务的shardHashKey功能,指定数据写到某一个shard中,可以使用LogPartitioner产生数据的hashKey,用法例子如下:

FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // 生成32位hash值
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });

注意LogPartitioner是可选的,不设置情况下, 数据会随机写入某一个shard。

2.2 权限设置:RAM Policy

Producer依赖日志服务的API写数据,如下:

  • log:PostLogStoreLogs
  • log:ListShards

当RAM子用户使用Producer时,需要对上述两个API进行授权:

接口 资源
log:PostLogStoreLogs acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
时间: 2024-10-22 12:33:16

日志服务(SLS) - 通过 Flink 消费Loghub日志的相关文章

阿里云日志服务(SLS)安装使用方法简介

使用前准备 开通阿里云账号 申请一台阿里云服务器(Elastic Compute Service,简称 ECS) 开通日志服务 登录阿里云后进入控制台,选择产品与服务-日志服务,根据提示开通日志服务 创建日志配置 日志服务开通后,跳转到控制台,点击创建project 填入project相关属性,注意所属区域要填成你申请的阿里云服务器(ECS)对应所在的区域 project创建成功后,会提示创建logstore(project和logstore属于包含关系,一个project下可创建多个logst

简单日志服务SLS产品发布公告

尊敬的阿里云用户: 阿里云简单日志服务SLS于2015年1月29日对外发布新版本,同时北京Region上线公测.详细信息如下:   一.行为变更 1.数据模型变更 数据模型变更:Category变更为Logstore.原API格式依然兼容,推荐用户使用新的API. SLS接口文档 2.离线归档行为变更 离线归档行为变更:由归档到ODPS公共表变更为直接导入用户指定表.在此之前,用户已经设定的归档到公共表配置依然在后台兼容,但不可再增加配置,推荐用户使用新的归档方式. 在ODPS中查看导入日志 3

自建ELK vs 日志服务(SLS)全方位对比

简介 提到日志实时分析,很多人都会想到很火的ELK Stack(Elastic/Logstash/Kibana)来搭建.ELK方案开源,在社区中有大量的内容和使用案例. 阿里云日志服务产品在新版中增强查询分析功能(LogSearch/Analytics),支持对日志数据实时索引与查询分析,并且对查询性能和计算数据量做了大量优化.在这里我们做一个全方位的比较,对于用户关心的点,我们依次展开分析: 易用:上手及使用过程中的代价 功能(重点):主要针对查询与分析两块 性能(重点):对于单位大小数据量查

简单日志服务SLS公测

  简单日志服务(Simple Log Service,简称SLS)是针对开发和运维提供的日志数据管理服务.用户只需简单地配置日志位置和格式等信息,就能够实时收集.存储和查询由一台或多台机器产生的各类日志数据(包括常见的访问日志.应用日志或由程序计数器产生的性能日志).   用户在开通服务后,可以在SLS管理控制台上配置日志收集路径.日志格式.日志产生机器组等信息,SLS日志收集客户端(Logtail)会根据配置进行日志实时收集.在控制台上,用户可以创建不同项目空间(Project)进行日志的管

日志服务(原SLS)新功能发布(5)--使用Logstash接入数据

日志服务结合Logstash 目前,阿里云用户可以通过API/SDK或Logtail将数据写入日志服务,参考. 今天要介绍一个新方法:使用著名开源软件Logstash采集机器日志数据,并结合日志服务插件完成数据上传日志服务功能. 用户可以在阿里云ECS,或者是IDC机房机器,又或者是其它云厂商的虚拟机上安装Logstash及插件,进行简单的配置,轻松地将本机日志数据搬到云上来. IIS日志场景 以Windows平台上最常见的IIS(Internet Information Services)日志

一分钟了解阿里云产品:日志服务

一.             概述   阿里云发布的产品种类齐全,今天让我们一起来了解下日志服务(Log Service,简称LOG)这款产品吧.   什么是日志服务呢?   日志服务是针对日志场景的平台化服务.无需开发就可以快速完成日志收集.分发.投递与查询, 适用于日志中转.监控.性能诊断.日志分析.审计等场景.     那么,日志服务有什么优势呢?   日志服务使用便捷.告别复杂配置,5分钟接入,通过API/Web管理上万设备,原生Agent支持,操作简单. 日志服务稳定可靠.三副本可靠性

日志服务+函数服务实战(1): 访问日志地域、运营商实时分析

概述 ETL(Extract-Transform-Load)用来描述将数据从来源端经过抽取(Extract).转换(Transform).加载(Load)至目的端的过程. 传统ETL是构建数据仓库的重要一环,用户从数据源抽取出所需的数据,经过数据清洗,最终按照预先定义好的数据仓库模型,将数据加载到数据仓库中去. 在今天,随着业务需求的日益增加,不同系统的相互大批量数据交互也已成为常态.数据在不同系统中流动起来,有助于充分发掘日志大数据的价值. 在云上,AWS Glue是一个功能完备的ETL产品,

日志服务新功能发布(2)--弹性伸缩(Merge/Split)

在之前的文章<日志服务(原SLS)新功能发布(1)--支持保序写入和消费>中,我们提到了Shard支持Key映射的特性,通过这个特性能够支持对序有需求的应用场景.今天我们给大家介绍一个在削峰填谷或流量突增情况下的功能:弹性伸缩.在生产中我们往往会面临峰值和低值的情况,也会遇到因业务层映射不均衡,导致某一个分区(shard)有非常大流量的场景,弹性伸缩(Merge/Split)就是为此设计的利器. 使用弹性伸缩的应用场景 场景1(视频类):根据峰值.底值弹性扩容,控制成本 用户A是一个视频类网站

如何将日志服务的数据秒级同步到表格存储

最近在容器服务的官方镜像中,新增了loghub-shipper的镜像,使用该镜像,可以订阅日志服务中的日志库,以秒级的延时将日志数据从日志服务中读出并转换成结构化数据存储在表格存储中,以满足实时在线服务的精确查询需求. 什么是日志服务? 日志服务(Log Service,Log)是针对日志场景的一站式解决方案,解决海量日志数据采集/订阅.转储与查询功能,比如在海量游戏日志收集与分析场景上的应用. 什么是表格存储? 表格存储(TableStore)提供海量NoSQL数据的存储与实时访问服务,能够支