使用Flink CDC将Mysql中的数据实时同步到ES

时间:2024-01-10 00:59:42 标签:  mysql  flink  elasticsearch  

前言

最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间……

我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过flink sql-client实现的,但这有个问题,当fink集群重启,JOB就没有了,没有办法通过savePointing来恢复。所以还是记录下。

代码

直接上代码:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000);
	    env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/savepointings");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE orders (\n" +
                "   order_id INT,\n" +
                "   order_date TIMESTAMP(0),\n" +
                "   customer_name STRING,\n" +
                "   price DECIMAL(10, 5),\n" +
                "   product_id INT,\n" +
                "   order_status BOOLEAN,\n" +
                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "   'connector' = 'mysql-cdc',\n" +
                "   'hostname' = 'localhost',\n" +
                "   'port' = '3306',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '123456',\n" +
                "   'database-name' = 'mydb',\n" +
                "   'table-name' = 'orders'\n" +
                " );").await();

        tableEnv.executeSql("CREATE TABLE products (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    description STRING,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = 'localhost',\n" +
                "    'port' = '3306',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '123456',\n" +
                "    'database-name' = 'mydb',\n" +
                "    'table-name' = 'products'\n" +
                "  );").await();

        tableEnv.executeSql("CREATE TABLE enriched_orders (\n" +
                "   order_id INT,\n" +
                "   order_date TIMESTAMP(0),\n" +
                "   customer_name STRING,\n" +
                "   price DECIMAL(10, 5),\n" +
                "   product_id INT,\n" +
                "   order_status BOOLEAN,\n" +
                "   product_name STRING,\n" +
                "   product_description STRING,\n" +
                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "     'connector' = 'elasticsearch-7',\n" +
                "     'hosts' = 'http://localhost:9200',\n" +
                "     'index' = 'enriched_orders_lhc'\n" +
                " );");

        tableEnv.executeSql("INSERT INTO enriched_orders\n" +
                " SELECT o.*, p.name, p.description\n" +
                " FROM orders AS o\n" +
                " LEFT JOIN products AS p ON o.product_id = p.id");

        env.execute("Mysql to ES");
    }
来源:https://blоg.сsdn.nеt/lhсniсhоlаs/аrtiсlе/dеtаils/129854091

智能推荐

前沿: flink cdc功能越发强大,支持的数据源也越多&#xff0c

标签:sqlserver  starrocks  flink  

本篇文章在Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保

标签:elasticsearch  SpringCloud  spring cloud  mysql  elasticsearch  

CDC 的全称是 Change Data Capture ,在广义的概念上&#xff0

标签:flink  大数据  

一、背景大家应该都在各种电商网站检索过商品,检索商品一般都是通过什么实现呢?搜索引擎Elasticsearch。那么问题来了,商品上架,数据一般写入到MySQL的数据库中,那么用于检索的数据又是怎么同步到Elasticsearch的呢?

标签:解决方案  数据同步  mysql  ES解决方案  数据同步  mysql  ES  

1 Maven依赖 <?xml version&#61;&

标签:大数据  Kafka  hbase  flink  

猜你喜欢

随着数据分析在业务决策中变得日益重要,数据实时同步和分析成为企业提升竞争力的关键。MySQL 作为广泛使用的关系型数据库,其数据存储丰富,但无法满足大规模数据分析和高并发查询的需求。而 SelectDB 作为一款专为大数据分析设计的分布式数据仓库,具有高性能、可扩展的特点,其优异的数据处理能力也在行业内广受关注。01 在什么情况下需要把 MySQL 同步到 SelectDB?大数据分析需求:当您的业务数据量不断增长,M

标签:实时  策略  mysql  SelectDB  

准备 你需要将这两个依赖添加到 pom.xml 中 mysql

标签:大数据  云原生  flink  mysql  linq  

背景 为了前端更快地进行数据检索&#xff0c;需要将数据存储到es中是一个很不

标签:大数据  elasticsearch  大数据  etl  数据库  ES  

在我的上一篇文章 “Elasticsearch&#xff1a;使用 Node.js 将实时数据提取到

标签:kibana  elasticsearch  Elastic  elasticsearch  大数据  kibana  全文检索  

软件环境 Flink1.13.3 Scala 2.12 doris

标签:flink  sql  java  

ClickHouse 在执行分析查询时的速度优势很好的弥补了 MySQL 的不足,但是对于很多开发者和DBA来说,如何将MySQL稳定、高效、简单的同步到 ClickHouse 却很困难。本文对比了 NineData、MaterializeMySQL(ClickHouse自带)、Bifrost 三款产品,看看他们在同步时的差异。对比结果概述整体上,NineData(官网:https://www.ninedata.cloud/&nbsp;)的数据复制功能在功能、性能表现最突出。其次是Bifrost和C

标签:产品对比  数据同步  时代  ClickHouse  mysql  

用 flink 插件chunjun实现全量&#43;增量同步&#xff0c;这里以达梦数据库同步到p

标签:flink  flink  数据库  大数据  

相关问题

相关文章

热门文章

推荐文章

相关标签