×

flink java教程

flink java教程(Flink--对DataSource的理解)

admin admin 发表于2024-06-14 03:32:14 浏览16 评论0

抢沙发发表评论

各位老铁们,大家好,今天由我来为大家分享flink java教程,以及Flink--对DataSource的理解的相关问题知识,希望对大家有所帮助。如果可以帮助到大家,还望关注收藏下本站,您的支持是我们最大的动力,谢谢大家了哈,下面我们开始吧!

本文目录

Flink--对DataSource的理解

1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。 2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。 3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。 4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。 5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。 2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。 3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。

实现:

重要注意:

socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。

addSource - 添加一个新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011《》(…)) 以从 Apache Kafka 读取数据。

1、基于集合:有界数据集,更偏向于本地测试用 2、基于文件:适合监听文件修改并读取其内容 3、基于 Socket:监听主机的 host port,从 Socket 中获取数据 4、自定义 addSource:大多数的场景数据都是无界的,会源源不断的过来。比如去消费 Kafka 某个 topic 上的数据,这时候就需要用到这个 addSource,可能因为用的比较多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等类可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 这个基础类,它是 Flink Kafka 消费的最根本的类。 5、flink目前支持的source详细可以阅读官网connects部分;

flink中ProcessFunction的注册定时器功能

在flink的ProcessFunction中,我们可以注册定时器设定延迟多长时间后执行某类操作,例如像这种: context.timerService().registerEventTimeTimer(context.timestamp() + 10000); 很好奇这种定时器内部是如何进行工作的,带着这种疑问我们来看看源码。 首先,在源码中,所有的定时器管理都是通过InternalTimerServiceImpl.java这个类来实现的。

可以看到根据ProcessingTime和EventTime的不同,分别加入不同的queue队列中,其中如果是ProcessingTime的话,他还会判断当前queue中第一个元素的触发时间是否比当前加入的注册时间晚,如果晚于当前新加入的时间,则把下次触发时间改成当前的新加入的注册时间。 我们接下来看queue队列是如何实现的,processingTimeTimersQueue和eventTimeTimersQueue实现原理都是一样的,它的实现类是HeapPriorityQueueSet.java:

这里的逻辑简单概况就是先简单判断该注册时间是否有重复,如果没有重复就继续往里添加,再来看super.add(element)这个方法的实现:

重点看siftUp这个方法,这个方法实现的的就是堆排序并且还是小顶堆排序,先把新的定时器放到数组末尾,然后就进行小顶堆排序,永远把最小的元素(定时器)排到最前面,这样就最早触发。至此逻辑就很清楚了:添加定时器的时候,首先会判断是否有重复,然后进行小顶堆排序,把最小的定时器放到第一个。 接下来我们继续看定时器是如何触发的,先看InternalTimerServiceImpl.java:

在收到watermark之后开始执行advanceWatermark方法,这时候从eventTimeTimersQueue中获得第一个定时器(之前加入的时候已经保证了弹出的第一个永远是时间最早的定时器)与当前watermark时间比较,如果小于watermark则取出该定时器执行onEventTime也就是ProcessFunction中的onTimer方法。 取出第一个定时器之后,会触发eventTimeTimersQueue中的小顶堆再次排序:

这里简单概况来说就是弹出第一个定时器,同时触发小顶堆再次排序,把数组中剩余的时间最小的定时器再次放到第一个位置. eventTime触发的定时器逻辑这里就讲完了,我们再看看processTime的触发逻辑,在InternalTimerServiceImpl.java中:

processTime是依靠自身线程注册的定时器来触发的,processingTimeTimersQueue的逻辑与eventTimeTimersQueue一样,这里就不多讲了,当弹出第一个定时器执行的时候,会立即注册下一个定时器,保证下一个定时器顺利按时执行

总计一下,在ProcessFunction中,eventTime依靠watermark来触发,processTime依靠自身线程注册的定时器触发,两者都是在添加定时器的时候,把定时器放入队列里面进行小顶堆排序,把时间最小的定时器放到第一个位置,最早触发。

flink:第二种模式:多个yarn session模式

这种方式的好处是一个任务会对应一个job,即每提交一个作业会根据自身的情况,向yarn申请资源,直到作业执行完成,并不会影响下一个作业的正常运行,除非是yarn上面没有任何资源的情况下。 注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败 不需要在yarn当中启动任何集群,直接提交任务即可 第一步:直接执行命令提交任务 cd /kkb/install/flink-1.8.1/ bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://node01:8020/flink_input -output hdfs://node01:8020/out_result/out_count.txt 第二步:查看输出结果 hdfs执行以下命令查看输出结果 hdfs dfs -text hdfs://node01:8020/out_result/out_count.txt 第三步:查看flink run帮助文档 我们可以使用--help 来查看帮助文档可以添加哪些参数 cd /kkb/install/flink-1.8.1/ bin/flink run --help 得到结果内容如下 Action "run" compiles and runs a program. Syntax: run 《jar-file》 《arguments》 "run" action options: -c,--class 《classname》 Class with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. -C,--classpath 《url》 Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelism 《parallelism》 The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -q,--sysoutLogging If present, suppress logging output to standard out. -s,--fromSavepoint 《savepointPath》 Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. Options for yarn-cluster mode: -d,--detached If present, runs the job in detached mode -m,--jobmanager 《arg》 Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. -yD 《property=value》 use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationId 《arg》 Attach to running YARN session -yj,--yarnjar 《arg》 Path to Flink jar file -yjm,--yarnjobManagerMemory 《arg》 Memory for JobManager Container with optional unit (default: MB) -yn,--yarncontainer 《arg》 Number of YARN container to allocate (=Number of Task Managers) -ynl,--yarnnodeLabel 《arg》 Specify YARN node label for the YARN application -ynm,--yarnname 《arg》 Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueue 《arg》 Specify YARN queue. -ys,--yarnslots 《arg》 Number of slots per TaskManager -yst,--yarnstreaming Start Flink in streaming mode -yt,--yarnship 《arg》 Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemory 《arg》 Memory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespace 《arg》 Namespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace 《arg》 Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -m,--jobmanager 《arg》 Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -z,--zookeeperNamespace 《arg》 Namespace to create the Zookeeper sub-paths for high availability mode 3、flink run脚本分析 我们提交flink任务的时候,可以加以下这些参数 1、默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】: bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 2、连接指定host和port的jobmanager: bin/flink run -m node01:8081 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 3、启动一个新的yarn-session: bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀 例如:bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

flink窗口的种类及详述

flink窗口的种类及详述: 滚动窗口(tumblingwindow)将事件分配到长度固定且互不重叠的桶中。 实际案例:简单且常见的分维度分钟级别同时在线用户数、总销售额 Java设置语句:window(TumblingProcessingTimeWindows.of(Time.seconds(5))) 该语句为设置滚动窗口的窗口时长为5秒钟 sql设置语句:FROM TABLE(TUMBLE(         TABLE source_table         , DESCRIPTOR(row_time)         , INTERVAL ’60’ SECOND)) Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ’60’ SECOND)),包含三部分参数。 第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL ’60’ SECOND 声明滚动窗口大小为 1 min滑动窗口:分配器将每个元素分配给固定窗口大小的窗口。与滚动窗口分配器类似,窗口的大小由 window size 参数配置。还有一个window slide参数用来控制滑动窗口的滑动大小。因此,如果滑动大小小于窗口大小,则滑动窗口会重叠。在这种情况下,一个元素会被分配到多个窗口中。 实际案例:简单且常见的分维度分钟级别同时在线用户数,1 分钟输出一次,计算最近 5 分钟的数据 java设置语句:window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) window size :窗口大小为 10秒钟 window slide:窗口间隔为5秒钟 sql设置语句: hop(row_time, interval ’1’ minute, interval ’5’ minute)  第一个参数为事件时间的时间戳;第二个参数为滑动窗口的滑动步长;第三个参数为滑动窗口大小。会话窗口:分配器通过活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。当会话窗口在一段时间内没有接收到元素时会关闭。会话窗口分配器需要配置一个会话间隙,定义了所需的不活动时长。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。 实际案例:计算每个用户在活跃期间(一个 Session)总共购买的商品数量,如果用户 5 分钟没有活动则视为 Session 断开 设置语句:基于事件时间的会话窗口window(EventTimeSessionWindows.withGap(Time.minutes(10))) 基于处理时间的会话窗口 Java设置:window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) 会话间隙,不活动时长为10秒钟 sql设置:session(row_time, interval ’5’ minute) Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中 Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。渐进式窗口:在其实就是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。  应用场景:周期内累计 PV,UV 指标(如每天累计到当前这一分钟的 PV,UV)。这类指标是一段周期内的累计状态,对分析师来说更具统计分析价值,而且几乎所有的复合指标都是基于此类指标的统计(不然离线为啥都要累计一天的数据,而不要一分钟累计的数据呢)。 实际案例:每天的截止当前分钟的累计 money(sum(money)),去重 id 数(count(distinct id))。每天代表渐进式窗口大小为 1 天,分钟代表渐进式窗口移动步长为分钟级别 sql设置:FROM TABLE(CUMULATE(        TABLE source_table        , DESCRIPTOR(row_time)        , INTERVAL ’60’ SECOND        , INTERVAL ’1’ DAY)) Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ’60’ SECOND, INTERVAL ’1’ DAY)),其中包含四部分参数: 第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL ’60’ SECOND 声明渐进式窗口触发的渐进步长为 1 min。第四个参数 INTERVAL ’1’ DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计全局窗口:分配器将具有相同 key 的所有元素分配给同一个全局窗口。仅当我们指定自定义触发器时,窗口才起作用。否则,不会执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束的点(译者注:即本身自己不知道窗口的大小,计算多长时间的元素) window(GlobalWindows.create())平时滑动窗口用得比较多,其次是滚动窗口

yarn-session模式提交flink任务步骤

yarn-session.sh(开辟资源) + flink run(提交任务) 1.在yarn上启动一个Flink会话,node1上执行以下命令 /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d 说明: 申请2个CPU、1600M内存 -n 表示申请2个容器,这里指的就是多少个taskmanager -tm 表示每个TaskManager的内存大小 -s 表示每个TaskManager的slots数量 -d 表示以后台程序方式运行 注意: 该警告不用管 WARN org.apache.hadoop.hdfs.DFSClient - Caught exception java.lang.InterruptedException ***隐藏网址*** /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar 运行完之后可以继续运行其他的小任务 /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar yarn application -kill application_1599402747874_0001

java flink

很多朋友都想知道java flink是什么?下面就一起来了解一下吧~ 1、Flink是什么 Java Apache Flink是一个开源的分布式,高性能,高可用,准确的流处理框架。支持实时流处理和批处理。2、Flink特性 (1)支持批处理和数据流程序处理 (2)优雅流畅的支持java和scala api (3)同时支持高吞吐量和低延迟 (4)支持事件处理和无序处理通过SataStream API,基于DataFlow数据流模型 (5)在不同的时间语义(时间时间,处理时间)下支持灵活的窗口(时间,技术,会话,自定义触发器) (6)仅处理一次的容错担保 (7)自动反压机制 (8)图处理(批) 机器学习(批) 复杂事件处理(流) (9)在dataSet(批处理)API中内置支持迭代程序(BSP) (10)高效的自定义内存管理,和健壮的切换能力在in-memory和out-of-core中 (11)兼容hadoop的mapreduce和storm (12)集成YARN,HDFS,Hbase 和其它hadoop生态系统的组件 3、Flink分布式执行 Flink分布式程序包含2个主要的进程:JobManager和TaskManager.当程序运行时,不同的进程就会参与其中,包括Jobmanager、TaskManager和JobClientFlink程序提交给JobClient,JobClient再提交到JobManager,JobManager负责资源的协调和Job的执行。一旦资源分配完成,task就会分配到不同的TaskManager,TaskManager会初始化线程去执行task,并根据程序的执行状态向JobManager反馈,执行的状态包括starting、in progress、finished以及canceled和failing等。当Job执行完成,结果会返回给客户端。

flink中可以实现每n秒执行一个方法的定时任务吗使用Java自己的定时操作是失效的

是的,Flink中可以使用定时器(Timer)来实现每n秒执行一个方法的定时任务。Flink的定时器分为两种类型:EventTime Timer和ProcessingTime Timer。其中,EventTime Timer是基于事件时间的定时器,可以使用在基于事件时间处理的Flink应用中,而ProcessingTime Timer是基于处理时间的定时器,可以使用在基于处理时间处理的Flink应用中。根据需求选择对应的定时器类型即可。下面是一个使用ProcessingTime Timer实现每n秒执行一个方法的示例代码:import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class Main {public static void main(String args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.socketTextStream("localhost", 9999).keyBy(0).process(new MyKeyedProcessFunction()).print();env.execute("ProcessingTime Timer Demo");}public static class MyKeyedProcessFunction extends KeyedProcessFunction《String, String, String》 {private transient ValueState《Long》 lastTriggerTimeState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor《Long》 lastTriggerTimeDescriptor = new ValueStateDescriptor《》("lastTriggerTime", Long.class);lastTriggerTimeState = getRuntimeContext().getState(lastTriggerTimeDescriptor);}@Overridepublic void processElement(String value, Context ctx, Collector《String》 out) throws Exception {// 每n秒触发一次定时器long currentTime = ctx.timerService().currentProcessingTime();long lastTriggerTime = lastTriggerTimeState.value() == null ? 0 : lastTriggerTimeState.value();long interval = 5000; // 每5秒执行一次if (currentTime - lastTriggerTime 》= interval) {lastTriggerTimeState.update(currentTime);out.collect("执行定时任务");}}}}在上述代码中,我们定义了一个KeyedProcessFunction,并在其中使用了ProcessingTime Timer来实现每5秒执行一次定时任务。每次处理元素时,首先获取当前时间,然后与上次触发定时器的时间进行比较,如果时间间隔超过了设定的值,则执行定时任务,并更新上次触发定时器的时间。需要注意的是,由于Flink是流式计算框架,定时器是基于时间的,因此需要使用TimeCharacteristic.ProcessingTime来指定使用ProcessingTime来计算定时器触发时间。另外,在使用定时器时需要考虑并发问题,例如使用ValueState来存储上次触发定时器的时间。

flink实战教程-集群的部署

这种模式我们一般是在用IDE调试程序的时候用到,当我们在本地用IDE开发程序的时候,执行main方法,flink会在本地启动一个包含jobmanager和taskmanager的进程的minicluster,程序运行完成之后,这个cluster进程退出。 这种模式就是直接在物理机上启动flink集群。我们可以通过 {FLINK_HOME}/conf/flink-conf.yaml. 此外,我们可以用 ${FLINK_HOME}/bin/taskmanager.sh start 再启动一个taskmanager。 这时我们通过jps命令查看一下启动的进程 我们看到这时候启动了两个taskmanager 这种部署模式对flink集群的资源管理是flink自己维护的,在生产环境下用的不多,所以我们也不做过多描述. 启动集群的命令如下: 这个命令有很多的参数,可以在后面加 -h 看下,我这里着重介绍一下 -d参数。 加上-d之后,指的是隔离模式,也就是启动之后和客户端就断了联系,如果要停止集群,需要通过yarn application -kill {applicationId} 来停止集群. 提交成功之后,我们会在yarn的管理页面看到一个类似的任务 这个启动命令也有很多的参数,我就不一一讲解了,我用大白话讲讲我认为最核心的几个参数。 第二,通过命令行来停止: 这个时候需要指定yarn applicationId和flink job id 第三,通过程序来停止 ***隐藏网址*** 如果我们做了一个实时平台这样的系统,就不能手工通过命令行来停止了,可以调用相应的api来停止任务. 这种模式是在flink 1.11 版本中提供的,flink的yarn per job模式启动的时候会把本地的flink的jar和用户的jar都上传到hdfs,这个过程非常的消耗网络的带宽,如果同时有多个人提交任务的话,那么对网络的影响就更大,此外,每次提交任务的时候flink的jar包是一样的,也不用每次都拷来拷去的,所以flink提供了一种新的application模式,可以把flink的jar和用户的jar都预先放到hdfs上,这样就能省去yarn per job模式提交任务的jar包拷贝工作,节省了带宽,加快了提交任务的速度. 具体的命令如下: -yD yarn.provided.lib.dirs :用来指定存放flink jar的目录 最后一个参数是用户的jar在hdfs上的路径. 说一下题外话,其实我们当时在做实时平台的时候,这个提交慢的问题我也发现了,当时我的想法是先启动一个flink集群,然后再把程序的JobGraph提交到这个yarn集群,不过后来嘛,由于 * %%$$# ^& 的原因,也没弄. 对于把服务容器化,也越来越成为一种趋势,所以k8s部署也越来越受大家的重视。 对于k8s部署flink这块说实话我研究的不是很深,也就不多说了。 我们还可以将程序部署到mesos或者使用docker,这个我没有去实际调研过,但是从flink的邮件列表大家沟通的问题或者是网上查到的资料看,这种模式部署应该不多,所以这里就不详细描述了。

OK,关于flink java教程和Flink--对DataSource的理解的内容到此结束了,希望对大家有所帮助。