pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

Flink流處理引擎零基礎速通之數據的抽取篇

今天不分享基礎概念知識了,來分享一個馬上工作需要的場景,要做數據的抽取,不用kettle,想用flink。實際就是flink的sql、table層級的api

一、CDC

CDC (Change Data Capture) ,在廣義的概念上,只要能捕獲數據變更的技術,都可以稱為 CDC 。但通常我們說的CDC 技術主要面向數據庫(包括常見的mysql,Oracle, MongoDB等)的變更,是一種用于捕獲數據庫中數據變更的技術。

二、常見CDC的比較

常見的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。

  • DataX,Sqoop和kettle的CDC實現技術主要是基于查詢的方式實現的,通過離線調度查詢作業,實現批處理請求。這種作業方式無法保證數據的一致性,實時性也較差。
  • Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技術。這種技術,利用流處理的方式,實時處理日志數據,保證了數據的一致性,為其他服務提供了實時數據。

三、Flink CDC

2020年 Flink cdc 首次在 Flink forward 大會上官宣, 由 Jark Wu & Qingsheng Ren 兩位大佬提出。

Flink CDC connector 可以捕獲在一個或多個表中發生的所有變更。該模式通常有一個前記錄和一個后記錄。Flink CDC connector 可以直接在Flink中以非約束模式(流)使用,而不需要使用類似 kafka 之類的中間件中轉數據。

四、Flink CDC支持的數據庫

PS:

Flink CDC 2.2才新增OceanBase,PolarDB-X,SqlServer,TiDB 四種數據源接入,均支持全量和增量一體化同步。

截止到目前FlinkCDC已經支持12+數據源。

五、阿里實現的FlinkCDC使用示例

依賴引入

    <!-- flink table支持 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- 阿里實現的flink mysql CDC -->
    <dependency>
      <groupId>com.alibaba.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>1.4.0</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.28</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.80</version>
    </dependency>
    <!-- jackson報錯解決 -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>${jackson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>${jackson.version}</version>
    </dependency>

基于table

package spendreport.cdc;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
;
/**
 * @author zhengwen
 **/
public class TestMySqlFlinkCDC {
  public static void main(String[] args) throws Exception {
    //1.創建執行環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    //2.Flink-CDC 將讀取 binlog 的位置信息以狀態的方式保存在 CK,如果想要做到斷點續傳, 需要從 Checkpoint 或者 Savepoint 啟動程序
    //2.1 開啟 Checkpoint,每隔 5 秒鐘做一次 CK
    env.enableCheckpointing(5000L);
    //2.2 指定 CK 的一致性語義
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    //2.3 設置任務關閉的時候保留最后一次 CK 數據
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //2.4 指定從 CK 自動重啟策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
        .hostname("127.0.0.1")
        .serverTimeZone("GMT+8")  //時區報錯增加這個設置
        .port(3306)
        .username("root")
        .password("123456")
        .databaseList("wz")
        .tableList("wz.user_info")  //注意表一定要寫庫名.表名這種,多個,隔開
        .startupOptions(StartupOptions.initial())
        //自定義轉json格式化
        .deserializer(new MyJsonDebeziumDeserializationSchema())
        //自帶string格式序列化
        //.deserializer(new StringDebeziumDeserializationSchema())
        .build();
    DataStreamSource<String> streamSource = env.addSource(sourceFunction);
    //TODO 可以keyBy,比如根據table或type,然后開窗處理
    //3.打印數據
    streamSource.print();
    //streamSource.addSink(); 輸出
    //4.執行任務
    env.execute("flinkTableCDC");
  }
  private static class MyJsonDebeziumDeserializationSchema implements
      com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
        throws Exception {
      Struct value = (Struct) sourceRecord.value();
      Struct source = value.getStruct("source");
      //獲取數據庫名稱
      String db = source.getString("db");
      String table = source.getString("table");
      //獲取數據類型
      String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
      if (type.equals("create")) {
        type = "insert";
      }
      JSONObject jsonObject = new JSONObject();
      jsonObject.put("database", db);
      jsonObject.put("table", table);
      jsonObject.put("type", type);
      //獲取數據data
      Struct after = value.getStruct("after");
      JSONObject dataJson = new JSONObject();
      List<Field> fields = after.schema().fields();
      for (Field field : fields) {
        String field_name = field.name();
        Object fieldValue = after.get(field);
        dataJson.put(field_name, fieldValue);
      }
      jsonObject.put("data", dataJson);
      collector.collect(JSONObject.toJSONString(jsonObject));
    }
    @Override
    public TypeInformation<String> getProducedType() {
      return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }
}

運行效果

PS:

  • 操作數據庫的增刪改就會立馬觸發
  • 這里是自定義的序列化轉json格式字符串,自帶的字符串序列化也是可以的(可以自己試試打印的內容)

基于sql

package spendreport.cdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
 * @author zhengwen
 **/
public class TestMySqlFlinkCDC2 {
  public static void main(String[] args) throws Exception {
    //1.創建執行環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    //2.創建 Flink-MySQL-CDC 的 Source
    String connectorName = "mysql-cdc";
    String dbHostName = "127.0.0.1";
    String dbPort = "3306";
    String dbUsername = "root";
    String dbPassword = "123456";
    String dbDatabaseName = "wz";
    String dbTableName = "user_info";
    String tableSql = "CREATE TABLE t_user_info ("
        + "id int,mobile varchar(20),"
        + "user_name varchar(30),"
        + "real_name varchar(60),"
        + "id_card varchar(20),"
        + "org_name varchar(100),"
        + "user_stars int,"
        + "create_by int,"
        // + "create_time datetime,"
        + "update_by int,"
        // + "update_time datetime,"
        + "is_deleted int) "
        + " WITH ("
        + " 'connector' = '" + connectorName + "',"
        + " 'hostname' = '" + dbHostName + "',"
        + " 'port' = '" + dbPort + "',"
        + " 'username' = '" + dbUsername + "',"
        + " 'password' = '" + dbPassword + "',"
        + " 'database-name' = '" + dbDatabaseName + "',"
        + " 'table-name' = '" + dbTableName + "'"
        + ")";
    tableEnv.executeSql(tableSql);
    tableEnv.executeSql("select * from t_user_info").print();
    env.execute();
  }
}

運行效果:

總結

既然是基于日志,那么數據庫的配置文件肯定要開啟日志功能,這里mysql需要開啟內容

server-id=1
log_bin=mysql-bin
binlog_format=ROW  #目前還只能支持行
expire_logs_days=30
binlog_do_db=wz #這里binlog的庫如果有多個就再寫一行,千萬不要寫成用,隔開

  • 實時性確實高,比那些自動任務定時取體驗號百倍
  • 流示的確實絲滑

最后肯定證明這種方式同步數據可行,而且實時性特高,但是就是不知道我們的目標數據庫是否可以開啟這些日志配置。UP!

到此這篇關于Flink流處理引擎零基礎速通之數據的抽取篇的文章就介紹到這了,更多相關Flink數據的抽取內容請搜索html5模板網以前的文章希望大家以后多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

主站蜘蛛池模板: 六维力传感器_六分量力传感器_模腔压力传感器-南京数智微传感科技有限公司 | 远程会诊系统-手术示教系统【林之硕】医院远程医疗平台 | 陕西安玻璃自动感应门-自动重叠门-磁悬浮平开门厂家【捷申达门业】 | 日本东丽膜_反渗透膜_RO膜价格_超滤膜_纳滤膜-北京东丽阳光官网 日本细胞免疫疗法_肿瘤免疫治疗_NK细胞疗法 - 免疫密码 | AGV叉车|无人叉车|AGV智能叉车|AGV搬运车-江西丹巴赫机器人股份有限公司 | 安徽净化工程设计_无尘净化车间工程_合肥净化实验室_安徽创世环境科技有限公司 | 智能终端_RTU_dcm_北斗星空自动化科技 | 储气罐,真空罐,缓冲罐,隔膜气压罐厂家批发价格,空压机储气罐规格型号-上海申容压力容器集团有限公司 | 「银杏树」银杏树行情价格_银杏树种植_山东程锦园林 | 矿用履带式平板车|探水钻机|气动架柱式钻机|架柱式液压回转钻机|履带式钻机-启睿探水钻机厂家 | 手术室净化装修-手术室净化工程公司-华锐手术室净化厂家 | 双齿辊破碎机-大型狼牙破碎机视频-对辊破碎机价格/型号图片-金联机械设备生产厂家 | 珠海白蚁防治_珠海灭鼠_珠海杀虫灭鼠_珠海灭蟑螂_珠海酒店消杀_珠海工厂杀虫灭鼠_立净虫控防治服务有限公司 | 代做标书-代写标书-专业标书文件编辑-「深圳卓越创兴公司」 | 留学生辅导网-在线课程论文辅导-留学生挂科申诉机构 | pbt头梳丝_牙刷丝_尼龙毛刷丝_PP塑料纤维合成毛丝定制厂_广州明旺 | 广州冷却塔维修厂家_冷却塔修理_凉水塔风机电机填料抢修-广东康明节能空调有限公司 | 沧州友城管业有限公司-内外涂塑钢管-大口径螺旋钢管-涂塑螺旋管-保温钢管生产厂家 | 广州番禺搬家公司_天河黄埔搬家公司_企业工厂搬迁_日式搬家_广州搬家公司_厚道搬迁搬家公司 | LED投光灯-工矿灯-led路灯头-工业灯具 - 山东普瑞斯照明科技有限公司 | 仿真茅草_人造茅草瓦价格_仿真茅草厂家_仿真茅草供应-深圳市科佰工贸有限公司 | 空冷器|空气冷却器|空水冷却器-无锡赛迪森机械有限公司[官网] | 湖南教师资格网-湖南教师资格证考试网 | 采暖炉_取暖炉_生物质颗粒锅炉_颗粒壁炉_厂家加盟批发_烟台蓝澳采暖设备有限公司 | 安全,主动,被动,柔性,山体滑坡,sns,钢丝绳,边坡,防护网,护栏网,围栏,栏杆,栅栏,厂家 - 护栏网防护网生产厂家 | b2b网站大全,b2b网站排名,找b2b网站就上地球网 | 防水套管_柔性防水套管_刚性防水套管-巩义市润达管道设备制造有限公司 | 烟气换热器_GGH烟气换热器_空气预热器_高温气气换热器-青岛康景辉 | 体检车_移动CT车_CT检查车_CT车_深圳市艾克瑞电气有限公司移动CT体检车厂家-深圳市艾克瑞电气有限公司 | 2025第九届世界无人机大会| 富森高压水枪-柴油驱动-养殖场高压清洗机-山东龙腾环保科技有限公司 | ★济南领跃标识制作公司★济南标识制作,标牌制作,山东标识制作,济南标牌厂 | 济南网站策划设计_自适应网站制作_H5企业网站搭建_济南外贸网站制作公司_锐尚 | 智慧钢琴-电钢琴-便携钢琴-数码钢琴-深圳市特伦斯乐器有限公司 | 带式压滤机_污泥压滤机_污泥脱水机_带式过滤机_带式压滤机厂家-河南恒磊环保设备有限公司 | 光纤测温-荧光光纤测温系统-福州华光天锐光电科技有限公司 | 皮带机_移动皮带机_大倾角皮带机_皮带机厂家 - 新乡市国盛机械设备有限公司 | 桁架楼承板-钢筋桁架楼承板-江苏众力达钢筋楼承板厂 | 艺术涂料_进口艺术涂料_艺术涂料加盟_艺术涂料十大品牌 -英国蒙太奇艺术涂料 | 【甲方装饰】合肥工装公司-合肥装修设计公司,专业从事安徽办公室、店面、售楼部、餐饮店、厂房装修设计服务 | 空压机商城|空气压缩机|空压机配件-压缩机网旗下商城 |