久久ER99热精品一区二区-久久精品99国产精品日本-久久精品免费一区二区三区-久久综合九色综合欧美狠狠

博客專欄

EEPW首頁 > 博客 > Flink 與Flink可視化平臺StreamPark教程(時間相關 1)

Flink 與Flink可視化平臺StreamPark教程(時間相關 1)

發布人:天翼云開發者 時間:2025-09-12 來源:工程師 發布文章

本文分享自天翼云開發者社區《Flink 與Flink可視化平臺StreamPark教程(時間相關 1)》,作者:l****n

水位線與窗口

對于流式數據,時間是一個重要的標識。在flink的事件時間語義下,我們不依賴系統時間,而是基于數據自帶的時間戳去定義了一個時鐘,用來表示當前時間的進展。于是每個并行子任務都會有一個自己的邏輯時鐘,它的前進是靠數據的時間戳來驅動的。

但在分布式系統中,這種驅動方式又會有一些問題。因為數據本身在處理轉換的過程中會變化,如果遇到窗口聚合這樣的操作,其實是要攢一批數據才會輸出一個結果,那么下游的數據就會變少,時間進度的控制就不夠精細了。

所以我們應該把時鐘也以數據的形式傳遞出去,告訴下游任務當前時間的進展;而且這個時鐘的傳遞不會因為窗口聚合之類的運算而停滯。一種簡單的想法是,在數據流中加入一個時鐘標記,記錄當前的事件時間;這個標記可以直接廣播到下游,當下游任務收到這個標記,就可以更新自己的時鐘了。由于類似于水流中用來做標志的記號,在 Flink 中,這種用來衡量事件時間(Event Time)進展的標記,就被稱作“水位線”(Watermark)。

水位線設置

這里我們將通過mysql-cdc來生成一個水位線,我們在讀取數據源的一側進行設置。

package cn.ctyun.demo.api.watermark;import cn.ctyun.demo.api.utils.TransformUtil;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/**
 * @classname: ViewContentStreamWithWaterMark
 * @description: 擁有水位線
 * @author: Liu Xinyuan
 * @create: 2023-04-14 09:50
 **/public class ViewContentStreamWithWaterMark {

    public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){
        // 1.創建Flink-MySQL-CDC的Source
        MySqlSource<String> viewContentSouce = MySqlSource.<String>builder()
                .hostname("***")
                .port(3306)
                .username("***")
                .password("***")
                .databaseList("test_cdc_source")
                .tableList("test_cdc_source.user_view")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();

        // 2.使用CDC Source從MySQL讀取數據
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                viewContentSouce,
                WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String extractData, long l) {
                                return JSONObject.parseObject(extractData).getLong("ts_ms");
                            }
                        }
                ),
                "ViewContentStreamWithWatermark Source"
        );
        // 3.轉換為指定格式
        return mysqlDataStreamSource.map(TransformUtil::formatResult);

    }}

我們在cdc傳來的數據中獲取他的日志自帶更新時間戳字段ts_ms時間戳作為我們的事件時間,并生成水位線,此后此數據流將包含水位線進行后續地傳遞。

窗口設置

在窗口中,有著不同的設置,可以面對不同的場景。我們按照數據不同的分配規則,將窗口的具體實現分為了以下四種,如下所示:

  • 滾動窗口(Tumbling Windows):滾動窗口有固定的大小,是一種對數據進行“均勻切片”的劃分方式。窗口之間沒有重疊,也不會有間隔,是“首尾相接”的狀態。如果我們把多個窗口的創建,看作一個窗口的運動,就好像它在不停地向前“翻滾”一樣。這是最簡單的窗口形式,我們之前所舉的例子都是滾動窗口。也正是因為滾動窗口是“無縫銜接”,所以每個數據都會被分配到一個窗口,而且只會屬于一個窗口。滾動窗口也是在BI分析中最常用的窗口類型之一。

  • 滑動窗口(Sliding Windows ):與滾動窗口類似,滑動窗口的大小也是固定的。區別在于,窗口之間并不是首尾相接的,而是可以“錯開”一定的位置。如果看作一個窗口的運動,那么就像是向前小步“滑動”一樣。所以定義滑動窗口的參數有兩個:窗口大小(window size)定義了窗口的大小,還有一個“滑動步長”(window slide),代表了窗口計算的頻率。

  • 會話窗口(Session Windows):會話窗口顧名思義,是基于“會話”(session)來來對數據進行分組的。這里的會話類似Web 應用中 session 的概念,不過并不表示兩端的通訊過程,而是借用會話超時失效的機制來 描述窗口。簡單來說,就是數據來了之后就開啟一個會話窗口,如果接下來還有數據陸續到來,那么就一直保持會話;如果一段時間一直沒收到數據,那就認為會話超時失效,窗口自動關閉。一般而言將會給數據設置一個超時時間,如果兩個數據間間隔過長并大于超時時間。在這里所有能夠控制的就是超時時間(gap),其作為判定新窗口開啟的一個重要指標。

  • 全局窗口(Session Windows):這種窗口全局有效,會把相同 key 的所有數據都分配到同一個窗口中;無界流的數據永無止盡,所以這種窗口也沒有結束的時候,默認是不會做觸發計算的。如果希望它能對數據進行計算處理,還需要自定義“觸發器”(Trigger)。

窗口API

窗口操作主要有兩個部分:窗口分配器(Window Assigners)和窗口函數(Window Functions)

窗口函數MapReduce

在這里,我們首先定義一個MapReduce過程,用來統計目前十秒內的訪問統計數量,這里的水位線設定請參考代碼ViewContentStreamWithWaterMark(上文中提供的代碼),具體的MapReduce如下所示

package cn.ctyun.demo.api;import cn.ctyun.demo.api.watermark.ViewContentStreamWithWaterMark;import com.alibaba.fastjson.JSONObject;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;/**
 * @classname: ApiTimeWindow
 * @description: 時間窗的使用
 * @author: Liu Xinyuan
 * @create: 2023-04-17 20:39
 **/public class ApiTimeWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<JSONObject> viewContentDataStream = ViewContentStreamWithWaterMark.getViewContentDataStream(env);

        viewContentDataStream.filter(new FilterFunction<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                // 不將刪除的數據考慮在內
                return !value.getString("op").equals("d");
            }
        }).map(new MapFunction<JSONObject, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(JSONObject value) throws Exception {
                return Tuple2.of(value.getString("user_name"), 1L);
            }
        }).keyBy(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 設定一個累加規則
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();
        env.execute();
    }}

這里設定了一個時間窗口為10秒,最終的結果為每十秒鐘將統計一個登錄統計,并輸出到控制臺。使用時間窗口后和不加的唯一區別是計算的范圍變為了時間窗內計算。


*博客內容為網友個人發布,僅代表博主個人觀點,如有侵權請聯系工作人員刪除。


關鍵詞: 大數據 flink 計算

相關推薦

技術專區

關閉