生而为人

程序员的自我修养

0%

因为本地测试中,本地构建的数据源并非预想中的单线程运行,所以想看下线程id,以确定是否每次都新起了线程。

推荐使用xml,结构清晰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<!-- 控制台输出 -->
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>

<!-- 文件输出(可选) -->
<File name="File" fileName="logs/flink-job.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</File>
</Appenders>

<Loggers>
<!-- Flink 相关日志级别 -->
<Logger name="org.apache.flink" level="INFO"/>

<!-- 你的项目包名 -->
<Logger name="com.jingqicao.code" level="DEBUG"/>

<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="File"/>
</Root>
</Loggers>
</Configuration>

解析

如果Logger不指定additivity=false,默认为true。此时当前Logger及Root都会处理日志,如果Logger与Root配了相同的Appender,就会重复输出。

Root的level,表示的是Root使用的日志级别,如果有一个Logger没有具体指定,就会交由Root处理,即日志级别由Root决定。

Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

Flink 流式聚合性能调优指南

【资料合集】Apache Flink 精选PDF下载

大数据实时计算引擎 Flink 实战与性能优化

大数据实时计算引擎 Flink 实战与性能优化

介绍

流计算框架 Flink 与 Storm 的性能对比

聚合处理

  1. Flink 消息聚合处理方案

实时数仓

美团点评基于 Flink 的实时数仓建设实践

待整理

  1. 本地自定义数据源,会启动多个线程,且不会停止,无法控制单线程输入

本地自定义数据源,运行起来不停止

日志记录

2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:53 - loop:2
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:54 - operationInfo:{"operationTimeStamp":"2021-01-27 16:58:23","pageId":"30001","sessionId":"abcd","unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:53 - loop:2
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:54 - operationInfo:{"operationTimeStamp":"2021-01-27 16:58:23","pageId":"30001","sessionId":"abcd","unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:53 - loop:2
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:54 - operationInfo:{"operationTimeStamp":"2021-01-27 16:58:23","pageId":"30001","sessionId":"abcd","unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}

现象:

  1. 两个数据源,都不断输出loop:2的记录,不会执行到loop:1,如果删除掉 Thread.sleep() 可以执行到loop:1
  2. 不会停止,按理说只应该执行一次loop:2

原因:

  1. 估计是Thread.sleep()把线程挂起后,无法唤醒,所以执行不到loop:1
  2. 可以判断与数据源无关,是受后续的处理方式影响的,如果直接sink,不会有问题2

原理

Flink 原理与实现:理解 Flink 中的计算资源

优化

Flink 滑动窗口优化

超长滑动窗口优化及解决方案

数据源

flink系列-4、flink自定义source、sink

Flink 如何自定义Source

Flink-04-Flink自定义Source

Flink——自定义Source

5分钟Flink - 自定义Source源

join

Flink如何实现3个实时流同时join,leftjoin,rightjoin

Flink Operator之CoGroup、Join以及Connect

方法详解

richXXXFunction的open,clone方法执行

资源

在线训练地址

数据处理

反压

怎么理解flink的有状态

基础

初识 HBase

Hbase入门详解

一篇文章入门Hbase

原理

HBase原理(读写流程)

hbase为什么能够实现实时读写

hbase知识点及实时读写原理

Hbase原理详解

优化

hbase读写性能测试调优

优化hbase的查询提升读写速率优化案例及性能提升的几种方法

实时系统HBase读写优化–大量写入无障碍

hbase大规模数据写入的优化历程

HBase篇–HBase常用优化

hbase实时优化思路

深入学习

HBase 学习分享

面试

Hbase 基础面试题