生而为人

程序员的自我修养

0%

[toc]

简历描述

业务数据处理管道搭建及维护

\1. 负责 Mediation / MMS 原始业务数据的解析与处理,涵盖数据清洗、结构化、标准化等流程,确保下游消费系统的数据一致性和准确性

\2. 基于 Spark实现大规模数据的 join、多维聚合、窗口函数等逻辑,支撑日报、计费、运营等核心场景

\3. 设计并维护,高效的数据处理管道,具备良好的可扩展性与容错性

\4. 通过优化处理逻辑及资源配置,使核心任务运行时长缩短 26%,资源占用降低17%

主要技术:spark、scope、c#、powershell

项目简介:

如何确定水位线的延迟时间:

通过测试job,对于不同水位线的join结果,统计准确度,5分钟延迟可以达到99.8%

实际窗口是2h

如何解决中间未join上的数据需要继续等待的问题,如何保证处理速度。

使用状态文件,对join上的数据直接输出,对于没能join上的数据保留到state文件中,下此与新到的数据一同计算

深入挖掘

优化方案

1. 流水线处理

2. 逻辑合并

[toc]

简历介绍

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

1
import org.apache.flink.streaming.api.scala._

flink1.6

官网链接

Keyed Windows

1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

Non-Keyed Windows

1
2
3
4
5
6
7
8
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

窗口生命周期

window

trigger

The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied. A triggering policy might be something like “when the number of elements in the window is more than 4”, or “when the watermark passes the end of the window”. A trigger can also decide to purge a window’s contents any time between its creation and removal. Purging in this case only refers to the elements in the window, and not the window metadata. This means that new data can still be added to that window.

function包含了窗口中数据的计算逻辑,trigger指定了在哪些条件下会调用这些function

Evictor

Apart from the above, you can specify an Evictor (see Evictors) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.

  • WindowAssigner
    • SlidingProcessingTimeWindows
    • BaseAlignedWindowAssigner
      • SlidingAlignedProcessingTimeWindows
    • TumblingEventTimeWindows
      • TumblingTimeWindows
    • MergingWindowAssigner
      • ProcessingTimeSessionWindows
      • DynamicProcessingTimeSessionWindows
      • DynamicEventTimeSessionWindows
      • EventTimeSessionWindows
    • TumblingProcessingTimeWindows
    • SlidingEventTimeWindows
      • SlidingTimeWindows
    • GlobalWindows

WindowAssigner

1. SlidingProcessingTimeWindows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* A {@link WindowAssigner} that windows elements into sliding windows based on the current
* system time of the machine the operation is running on. Windows can possibly overlap.
*
* <p>For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/

/**
* 基于flink处理时间的滑动窗口
*/

6. SlidingEventTimeWindows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
* elements. Windows can possibly overlap.
*
* <p>For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)));
* } </pre>
*/

/**
* 基于元素时间的滑动窗口
*/

6.1 SlidingTimeWindows(废弃)

2. BaseAlignedWindowAssigner

1.1 SlidingAlignedProcessingTimeWindows

5. TumblingProcessingTimeWindows

3. TumblingEventTimeWindows

1
2
3
4
5
6
7
8
9
10
11
12
/**
* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements. Windows cannot overlap.
*
* <p>For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
* } </pre>
*/

3.1 TumblingTimeWindows(废弃)

1
2
3
4
5
6
/**
* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements. Windows cannot overlap.
*
* @deprecated Please use {@link TumblingEventTimeWindows}.
*/

4. MergingWindowAssigner

4.1 ProcessingTimeSessionWindows

4.2 DynamicProcessingTimeSessionWindows

4.3 DynamicEventTimeSessionWindows

4.4 EventTimeSessionWindows

7. GlobalWindows

总结

适用场景

  1. 滑动窗口
  • 每条数据会发送到多个滑动窗口中,即在最终的输出中,一条数据要被统计多次
  • 适合统计据当前时间往前一段时间内的数据汇总
  1. 滚动窗口
  • 每条数据只会在一个滚动窗口中
  • 适合对数据进行简单聚合后,再次聚合的场景
  • 适合输出明细,不做聚合的场景,比如join后直接输出

占用内存比较

  1. 滑动窗口
    • 理论上一条数据会复制到多个窗口,被复制几次,占用内存就会扩大几倍,但不清楚是否有优化,比如只复制数据的引用?
  2. 滚动窗口

关于时间点

窗口的开始时间

  1. 首先要明确的是,开始时间只与system time和offset参数相关,与程序开始运行时间无关
  2. 比如设置了窗口size是1h,那么在

窗口的结束时间

因为本地测试中,本地构建的数据源并非预想中的单线程运行,所以想看下线程id,以确定是否每次都新起了线程。

推荐使用xml,结构清晰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<!-- 控制台输出 -->
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>

<!-- 文件输出(可选) -->
<File name="File" fileName="logs/flink-job.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</File>
</Appenders>

<Loggers>
<!-- Flink 相关日志级别 -->
<Logger name="org.apache.flink" level="INFO"/>

<!-- 你的项目包名 -->
<Logger name="com.jingqicao.code" level="DEBUG"/>

<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="File"/>
</Root>
</Loggers>
</Configuration>

解析

如果Logger不指定additivity=false,默认为true。此时当前Logger及Root都会处理日志,如果Logger与Root配了相同的Appender,就会重复输出。

Root的level,表示的是Root使用的日志级别,如果有一个Logger没有具体指定,就会交由Root处理,即日志级别由Root决定。

Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

Flink 流式聚合性能调优指南

【资料合集】Apache Flink 精选PDF下载

大数据实时计算引擎 Flink 实战与性能优化

大数据实时计算引擎 Flink 实战与性能优化

介绍

流计算框架 Flink 与 Storm 的性能对比

聚合处理

  1. Flink 消息聚合处理方案

实时数仓

美团点评基于 Flink 的实时数仓建设实践