Flink 與Flink可視化平臺StreamPark教程(開篇)
本文分享自天翼云開發者社區《Flink 與Flink可視化平臺StreamPark教程(開篇)》,作者:l****n
Flink是一個大數據流處理引擎,可以為不同行業提供實時大數據處理解決方案。隨著Flink的快速發展和改進,世界各地的許多公司現在都能看到它的存在。目前,北美、歐洲和金磚國家都是全球Flink應用的熱門地區。當然,Flink在中國的知名度特別高,部分原因是一些互聯網大廠的貢獻和引領效應,也符合中國的反應與場景密切相關。想象一下,在中國,一個網站可能需要面對數以億計的日活躍用戶和每秒數億的計算峰值,這對許多外國公司來說是難以想象的。Flink為我們提供了高速準確處理海量流媒體數據的可能性。
在目前的云原生時代,容器化、K8S等技術已經在各個互聯網大廠中獨占鰲頭,大部分的應用已經實現了上云。對于大數據引擎家族中的一員,flink實現與K8S結合、實現云原生下的severless模式的需求日漸增加,。因此,在本文中,主要為實現面對云原生+flink進行講解,希望能夠給讀者帶來獲得新知識的喜悅。
在這里,將會提供flink的使用方法,和一個flink可視化平臺StreamPark中的使用方式。本文將實時更新,將依次介紹其中各個方式的使用方法。在這里將會涉及以下知識點:
DataStreamApi的使用
UDF的開發
FlinkSql的使用
Flink cdc功能
原生flink k8s application的使用
翼flink-StreamPark的使用要點
本文的目錄暫定如此,后續將會對其中的內容加以補充,請廣大讀者提出寶貴意見,如需添加或刪減某些知識點可留言或私信本文作者。
FLINK 與Flink可視化平臺StreamPark教程介紹基礎環境數據源搭建構鍵k8s集群下載flink客戶端提供flink運行任務的環境DataStreamApiMAP-REDUCE流程水位線功能水位線設置窗口設置窗口API窗口函數MapReduce窗口函數Aggregate窗口處理函數JOIN功能時間窗Inner JoinFlink的狀態算子狀態checkpoint和savepointFlinkSql功能FlinkSql與連接器(Connector)相結合sql與DataStream混合編碼SQL模式與原生Flink的關系與差異與適配FlinkSql的動態表FlinkSql的持續查詢FlinkSql時間與窗口FlinkSql UDF編寫方式標量函數表函數聚合函數FlinkSql JOIN功能Regular JoinFlinkCDC功能基本概念使用api進行操作使用flinksql進行操作斷點續傳K8s Application運行方式任務jar生成k8s Application運行flink任務Application模式架構啟動命令PodTemplate翼flink-StreamPark使用要點概述常規使用依賴導入
在本文中,將面向開發程序員、面向一線碼農,帶來最詳細的flink教程。從基礎環境搭建到最后的平臺應用均會涉及。
對于flink而言,少不了對流式數據的處理,一般而言面對kafka、rabbitmq、cdc等消息為數據源主流,在這里,為簡化基礎環境搭建流程,將提供mysql數據源并開啟binlog模式作為我們的數據源,實現流(CDC功能接入binlog)批(常規查詢)一體的輸入。
數據源搭建在本文中,我們使用mysql作為數據源,并開啟binlog作為流數據作為本實例中的數據源。在這里首先需要安裝一個docker運行mysql容器,已實現統一基礎環境。
# 移除掉舊的版本
sudo yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine
# 刪除所有舊的數據
sudo rm -rf /var/lib/docker
# 安裝依賴包
sudo yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
# 添加源,使用了阿里云鏡像
sudo yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 配置緩存
sudo yum makecache fast
# 安裝最新穩定版本的docker
sudo yum install -y docker-ce
# 配置鏡像加速器
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["http://hub-mirror.c.163.com"]
}
EOF
# 啟動docker引擎并設置開機啟動
sudo systemctl start docker
sudo systemctl enable docker
# 配置當前用戶對docker的執行權限
sudo groupadd docker
sudo gpasswd -a ${USER} docker
sudo systemctl restart docker
完成docker的安裝后,可以執行如下命令,實現mysql的安裝
docker run -p 3307:3306 --name myMysql -v /mydata/mysql/log:/var/log/mysql -v /mydata/mysql/data:/var/lib/mysql -v /mydata/mysql/conf:/etc/mysql -e MYSQL_ROOT_PASSWORD=***** -d mysql:5.7.25
注意這里我們建議開啟mysql的binlog功能,供我們后續的CDC功能的使用,因此在啟動后需修改mysql的配置文件,以使其支持binlog功能。開啟此功能后,關于mysql中數據的修改將會被記錄,在后續連接mysql后,將會以流
修改my.cnf文件
[mysqld]
log-bin=/var/lib/mysql/mysql-bin
server-id=123654
expire_logs_days = 30
之后重啟容器
docker restart myMysql構鍵k8s集群
在這里,我們需要搭建一個K8S環境用于提供flink任務的運行時環境。在這里推薦使用kubeadm或者一些腳本工具【鏈接】github中的腳本工具搭建。具體過程在這里省略,可以參考上述鏈接中的文檔進行操作。
需要注意的是,我們需要在相應用戶的目錄下提供一個kubeconfig文件,一般而言,該文件在安裝好k8s后將會在~/.kube/目錄下出現,如下圖所示,通過該文件,才能順利地調用K8S客戶端提交任務,該config的內容為與K8S的ApiServer進行連接時需要使用的信息。

flink客戶端是控制flink的核心,需要下載并部署
wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz提供flink運行任務的環境
tar -xf flink-1.14.3-bin-scala_2.12.tgz
將kubeconfig提供出來,供flink客戶端調用,在這里要保證我們使用的客戶端時,我們的用戶下擁有kubeconfig文件
在這里主要提供一個供flink使用的命名空間、和SA。在K8S Application模式下,service acount(SA)是flink的jobmanager使用的服務賬號,jobmanager以此來獲得啟動相應的taskamanager的權限。這一點在后續的K8S application模式下比較重要。
# 創建namespace
kubectl create ns flink-dev
# 創建serviceaccount
kubectl create serviceaccount flink-service-account -n flink-dev
# 用戶授權
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-dev:flink-service-account
*博客內容為網友個人發布,僅代表博主個人觀點,如有侵權請聯系工作人員刪除。

