Flink WordCount实践

Flink WordCount实践

码农世界 2024-05-28 后端 66 次浏览 0个评论

目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式

 

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

Flink WordCount实践

Flink WordCount实践

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

	
        1.17.1
    
    
        
            org.apache.flink
            flink-streaming-java
            ${flink.version}
        
        
            org.apache.flink
            flink-clients
            ${flink.version}
        
    

Flink WordCount实践

刷新依赖

Flink WordCount实践

刷新依赖后,能看到相关依赖如下

Flink WordCount实践

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

Flink WordCount实践

在上面一行添加阿里云仓库镜像

	
      alimaven
      aliyun maven
      http://maven.aliyun.com/nexus/content/groups/public/
      central        
    

Flink WordCount实践

2.IDEA设置maven

Flink WordCount实践

Flink WordCount实践

数据准备

在工程的根目录下,新建一个data文件夹

Flink WordCount实践

并在data文件夹下创建文本文件words.txt

Flink WordCount实践

内容如下

hello world
hello java
hello flink

Flink WordCount实践

新建包

右键src/main下的java,新建Package

Flink WordCount实践

填写包名org.example,包名与groupId的内容一致。

Flink WordCount实践

批处理API实现WordCount

在org.exmaple下新建wc包及BatchWordCount类

Flink WordCount实践

填写wc.BatchWordCount

Flink WordCount实践

效果如下

Flink WordCount实践

BatchWordCount.java代码如下:

package org.example.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 从文件读取数据 按行读取
        DataSource lineDS = env.readTextFile("data/words.txt");
        // 3. 转换数据格式
        FlatMapOperator> wordAndOne = lineDS.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String line, Collector> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word,1L));
                }
            }
        });
        // 4. 按照 word 进行分组
        UnsortedGrouping> wordAndOneUG = wordAndOne.groupBy(0);
        // 5. 分组内聚合统计
        AggregateOperator> sum = wordAndOneUG.sum(1);
        // 6. 打印结果
        sum.print();
    }
}

运行程序,查看结果

Flink WordCount实践

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

在org.example.wc包下新建Java类StreamWordCount,代码如下:

package org.example.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文件
        DataStreamSource lineStream = env.readTextFile("input/words.txt");
        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator> sum = lineStream.flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String line, Collector> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);
        // 4. 打印
        sum.print();
        // 5. 执行
        env.execute();
    }
}

运行结果

Flink WordCount实践

与批处理程序BatchWordCount的区别:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

    数据源是socket文本流

    流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

    在org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

    package org.example.wc;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    public class SocketStreamWordCount {
        public static void main(String[] args) throws Exception {
            // 1. 创建流式执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
            DataStreamSource lineStream = env.socketTextStream("node2", 7777);
            // 3. 转换、分组、求和,得到统计结果
            SingleOutputStreamOperator> sum = lineStream.flatMap((String line, Collector> out) -> {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }).returns(Types.TUPLE(Types.STRING, Types.LONG))
                    .keyBy(data -> data.f0)
                    .sum(1);
            // 4. 打印
            sum.print();
            // 5. 执行
            env.execute();
        }
    }

    进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

    [hadoop@node2 ~]$ sudo yum install nc -y

    开启nc监听

    [hadoop@node2 ~]$ nc -lk 7777

    IDEA中,运行SocketStreamWordCount程序。

    往7777端口发送数据,例如发送hello world

    Flink WordCount实践

    控制台输出

    Flink WordCount实践

    继续往7777端口发送数据,例如发送hello flink

    Flink WordCount实践

    控制台输出

    Flink WordCount实践

    停止SocketStreamWordCount程序。

    按Ctrl+c停止nc命令。

    打包

    这里的打包是将写好的程序打成jar包。

    点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

    Flink WordCount实践

    打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

    Flink WordCount实践

    提交到集群运行

    把jar包提交到flink集群运行有两种方式:

    1.通过命令行提交作业   

    2.通过Web UI提交作业

    命令行提交作业

    将jar包上传Linux

    Flink WordCount实践

    启动flink集群
    [hadoop@node2 ~]$ start-cluster.sh 
    Starting cluster.
    Starting standalonesession daemon on host node2.
    Starting taskexecutor daemon on host node2.
    Starting taskexecutor daemon on host node3.
    Starting taskexecutor daemon on host node4.
    ​
    
    开启nc监听
    [hadoop@node2 ~]$ nc -lk 7777
    ​
    
    命令提交作业

    开启另一个node2终端,使用flink run命令提交作业到flink集群

    [hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

    Flink WordCount实践

    -m指定提交到的JobManager,-c指定程序入口类。

    发送测试数据

    在nc监听终端,往7777端口发送数据

    Flink WordCount实践

    查看结果
    Web UI查看结果

    浏览器访问

    node2:8081

    看到正在运行的作业如下

    Flink WordCount实践

    查看结果

    Flink WordCount实践

    Flink WordCount实践

    继续发送测试数据

    在nc终端继续发送数据

    Flink WordCount实践

    Web UI刷新结果

    Flink WordCount实践

    命令行查看结果

    打开新的node2终端,查看结果

    [hadoop@node2 ~]$ cd $FLINK_HOME/log
    [hadoop@node2 log]$ ls
    flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
    flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
    flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
    flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
    flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
    flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
    flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
    [hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out 
    (hello,1)
    (flink,1)
    (hello,2)
    (world,1)
    ​

    Flink WordCount实践

    取消flink作业

    Flink WordCount实践

    点击Cancel Job取消作业 

    Flink WordCount实践

    Flink WordCount实践

    停止nc监听

    按Ctrl+c停止nc命令

    Web UI提交作业

    开启nc监听

    开启nc监听发送数据

    [hadoop@node2 ~]$ nc -lk 7777
    Web UI提交作业

    浏览器访问

    node2:8081

    点击Submit New Job

    Flink WordCount实践

    点击Add New

    Flink WordCount实践

    选择flink作业jar包所在路径

    Flink WordCount实践

    点击jar包名称

    Flink WordCount实践

    填写相关内容,点击Submit提交作业

    Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

    Parallesim填写作业的并行度,例如:1

    Flink WordCount实践

    提交后,在Running Jobs里看到运行的作业

    Flink WordCount实践

    发送测试数据

    往7777端口发送数据

    Flink WordCount实践

    查看结果

    Flink WordCount实践

    继续发送测试数据

    Flink WordCount实践

    刷新结果

    Flink WordCount实践

    取消作业

    Flink WordCount实践

    停止nc监听

    按住Ctrl+c停止nc命令

    关闭flink集群
    [hadoop@node2 ~]$ stop-cluster.sh 
    Stopping taskexecutor daemon (pid: 2283) on host node2.
    Stopping taskexecutor daemon (pid: 1827) on host node3.
    Stopping taskexecutor daemon (pid: 1829) on host node4.
    Stopping standalonesession daemon (pid: 1929) on host node2.

    上传代码到gitee

    登录gitee

    https://gitee.com/

    注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

    创建仓库

    Flink WordCount实践

    Flink WordCount实践

    Flink WordCount实践

    提交代码

    使用IDEA提交代码

    Flink WordCount实践

    Flink WordCount实践

    提示有警告,忽略警告,继续提交

    Flink WordCount实践

    Flink WordCount实践

    Flink WordCount实践

    Flink WordCount实践

    Flink WordCount实践

    Flink WordCount实践

    Flink WordCount实践

    提交成功后,IDEA显示如下

    Flink WordCount实践

    刷新浏览器查看gitee界面,看到代码已上传成功

    Flink WordCount实践

    完成!enjoy it!

转载请注明来自码农世界,本文标题:《Flink WordCount实践》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,66人围观)参与讨论

还没有评论,来说两句吧...

Top