目录
前提条件
基本准备
批处理API实现WordCount
流处理API实现WordCount
数据源是文件
数据源是socket文本流
打包
提交到集群运行
命令行提交作业
Web UI提交作业
上传代码到gitee
前提条件
Windows安装好jdk8、Maven3、IDEA
Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
基本准备
创建项目
使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo
添加依赖
在项目的pom.xml文件中添加Flink的依赖。
1.17.1 org.apache.flink flink-streaming-java${flink.version} org.apache.flink flink-clients${flink.version}
刷新依赖
刷新依赖后,能看到相关依赖如下
刷新依赖过程需要等待一些时间来下载相关依赖。
如果依赖下载慢,可以设置阿里云仓库镜像:
1.设置maven的settings.xml
在上面一行添加阿里云仓库镜像
alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ central
2.IDEA设置maven
数据准备
在工程的根目录下,新建一个data文件夹
并在data文件夹下创建文本文件words.txt
内容如下
hello world hello java hello flink
新建包
右键src/main下的java,新建Package
填写包名org.example,包名与groupId的内容一致。
批处理API实现WordCount
在org.exmaple下新建wc包及BatchWordCount类
填写wc.BatchWordCount
效果如下
BatchWordCount.java代码如下:
package org.example.wc; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWordCount { public static void main(String[] args) throws Exception { // 1. 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2. 从文件读取数据 按行读取 DataSourcelineDS = env.readTextFile("data/words.txt"); // 3. 转换数据格式 FlatMapOperator > wordAndOne = lineDS.flatMap(new FlatMapFunction >() { @Override public void flatMap(String line, Collector > out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word,1L)); } } }); // 4. 按照 word 进行分组 UnsortedGrouping > wordAndOneUG = wordAndOne.groupBy(0); // 5. 分组内聚合统计 AggregateOperator > sum = wordAndOneUG.sum(1); // 6. 打印结果 sum.print(); } }
运行程序,查看结果
注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
流处理API实现WordCount
数据源是文件
在org.example.wc包下新建Java类StreamWordCount,代码如下:
package org.example.wc; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文件 DataStreamSourcelineStream = env.readTextFile("input/words.txt"); // 3. 转换、分组、求和,得到统计结果 SingleOutputStreamOperator > sum = lineStream.flatMap(new FlatMapFunction >() { @Override public void flatMap(String line, Collector > out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } } }).keyBy(data -> data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } }
运行结果
与批处理程序BatchWordCount的区别:
-
创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
-
转换处理之后,得到的数据对象类型不同。
-
分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
-
代码末尾需要调用env的execute方法,开始执行任务。
数据源是socket文本流
流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。
在org.example.wc包下新建Java类SocketStreamWordCount,代码如下:
package org.example.wc; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class SocketStreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号 DataStreamSource
lineStream = env.socketTextStream("node2", 7777); // 3. 转换、分组、求和,得到统计结果 SingleOutputStreamOperator > sum = lineStream.flatMap((String line, Collector > out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(data -> data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } } 进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:
[hadoop@node2 ~]$ sudo yum install nc -y
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
IDEA中,运行SocketStreamWordCount程序。
往7777端口发送数据,例如发送hello world
控制台输出
继续往7777端口发送数据,例如发送hello flink
控制台输出
停止SocketStreamWordCount程序。
按Ctrl+c停止nc命令。
打包
这里的打包是将写好的程序打成jar包。
点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。
打包成功后,看到如下输出信息,生成的jar包在项目的target目录下
提交到集群运行
把jar包提交到flink集群运行有两种方式:
1.通过命令行提交作业
2.通过Web UI提交作业
命令行提交作业
将jar包上传Linux
启动flink集群
[hadoop@node2 ~]$ start-cluster.sh Starting cluster. Starting standalonesession daemon on host node2. Starting taskexecutor daemon on host node2. Starting taskexecutor daemon on host node3. Starting taskexecutor daemon on host node4.
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
命令提交作业
开启另一个node2终端,使用flink run命令提交作业到flink集群
[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar
-m指定提交到的JobManager,-c指定程序入口类。
发送测试数据
在nc监听终端,往7777端口发送数据
查看结果
Web UI查看结果
浏览器访问
node2:8081
看到正在运行的作业如下
查看结果
继续发送测试数据
在nc终端继续发送数据
Web UI刷新结果
命令行查看结果
打开新的node2终端,查看结果
[hadoop@node2 ~]$ cd $FLINK_HOME/log [hadoop@node2 log]$ ls flink-hadoop-client-node2.log flink-hadoop-standalonesession-0-node2.out flink-hadoop-standalonesession-0-node2.log flink-hadoop-taskexecutor-0-node2.log flink-hadoop-standalonesession-0-node2.log.1 flink-hadoop-taskexecutor-0-node2.log.1 flink-hadoop-standalonesession-0-node2.log.2 flink-hadoop-taskexecutor-0-node2.log.2 flink-hadoop-standalonesession-0-node2.log.3 flink-hadoop-taskexecutor-0-node2.log.3 flink-hadoop-standalonesession-0-node2.log.4 flink-hadoop-taskexecutor-0-node2.log.4 flink-hadoop-standalonesession-0-node2.log.5 flink-hadoop-taskexecutor-0-node2.out [hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out (hello,1) (flink,1) (hello,2) (world,1)
取消flink作业
点击Cancel Job取消作业
停止nc监听
按Ctrl+c停止nc命令
Web UI提交作业
开启nc监听
开启nc监听发送数据
[hadoop@node2 ~]$ nc -lk 7777
Web UI提交作业
浏览器访问
node2:8081
点击Submit New Job
点击Add New
选择flink作业jar包所在路径
点击jar包名称
填写相关内容,点击Submit提交作业
Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount
Parallesim填写作业的并行度,例如:1
提交后,在Running Jobs里看到运行的作业
发送测试数据
往7777端口发送数据
查看结果
继续发送测试数据
刷新结果
取消作业
停止nc监听
按住Ctrl+c停止nc命令
关闭flink集群
[hadoop@node2 ~]$ stop-cluster.sh Stopping taskexecutor daemon (pid: 2283) on host node2. Stopping taskexecutor daemon (pid: 1827) on host node3. Stopping taskexecutor daemon (pid: 1829) on host node4. Stopping standalonesession daemon (pid: 1929) on host node2.
上传代码到gitee
登录gitee
https://gitee.com/
注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。
创建仓库
提交代码
使用IDEA提交代码
提示有警告,忽略警告,继续提交
提交成功后,IDEA显示如下
刷新浏览器查看gitee界面,看到代码已上传成功
完成!enjoy it!
还没有评论,来说两句吧...