1、 早期关系型数据库之间的数据同步
1)、全量同步
比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表,这个地方需要注意的是,分页查询时,一定要按照主键id来排序分页,避免重复插入。
2)、基于数据文件导出和导入的全量同步,这种同步方式一般只适用于同种数据库之间的同步,如果是不同的数据库,这种方式可能会存在问题。
3)、基于触发器的增量同步
增量同步一般是做实时的同步,早期很多数据同步都是基于关系型数据库的触发器trigger来做的。
使用触发器实时同步数据的步骤:
A、 基于原表创触发器,触发器包含insert,modify,delete 三种类型的操作,数据库的触发器分Before和After两种情况,一种是在insert,modify,delete 三种类型的操作发生之前触发(比如记录日志操作,一般是Before),一种是在insert,modify,delete 三种类型的操作之后触发。
B、 创建增量表,增量表中的字段和原表中的字段完全一样,但是需要多一个操作类型字段(分表代表insert,modify,delete 三种类型的操作),并且需要一个唯一自增ID,代表数据原表中数据操作的顺序,这个自增id非常重要,不然数据同步就会错乱。
C、 原表中出现insert,modify,delete 三种类型的操作时,通过触发器自动产生增量数据,插入增量表中。
D、处理增量表中的数据,处理时,一定是按照自增id的顺序来处理,这种效率会非常低,没办法做批量操作,不然数据会错乱。 有人可能会说,是不是可以把insert操作合并在一起,modify合并在一起,delete操作合并在一起,然后批量处理,我给的答案是不行,因为数据的增删改是有顺序的,合并后,就没有顺序了,同一条数据的增删改顺序一旦错了,那数据同步就肯定错了。
市面上很多数据etl数据交换产品都是基于这种思想来做的。
E、 这种思想使用kettle 很容易就可以实现,笔者曾经在自己的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/7360673.html
4)、基于时间戳的增量同步
A、首先我们需要一张临时temp表,用来存取每次读取的待同步的数据,也就是把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据
B、我们还需要创建一个时间戳配置表,用于存放每次读取的处理完的数据的最后的时间戳。
C、每次从原表中读取数据时,先查询时间戳配置表,然后就知道了查询原表时的开始时间戳。
D、根据时间戳读取到原表的数据,插入到临时表中,然后再将临时表中的数据插入到目标表中。
E、从缓存表中读取出数据的最大时间戳,并且更新到时间戳配置表中。缓存表的作用就是使用sql获取每次读取到的数据的最大的时间戳,当然这些都是完全基于sql语句在kettle中来配置,才需要这样的一张临时表。
2、 大数据时代下的数据同步
1)、基于数据库日志(比如mysql的binlog)的同步
我们都知道很多数据库都支持了主从自动同步,尤其是mysql,可以支持多主多从的模式。那么我们是不是可以利用这种思想呢,答案当然是肯定的,mysql的主从同步的过程是这样的。
A、master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
B、slave将master的binary log events拷贝到它的中继日志(relay log);
C、slave重做中继日志中的事件,将改变反映它自己的数据。
阿里巴巴开源的canal就完美的使用这种方式,canal 伪装了一个Slave 去喝Master进行同步。
A、 canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
B、 mysql master收到dump请求,开始推送binary log给slave(也就是canal)
C、 canal解析binary log对象(原始为byte流)
另外canal 在设计时,特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。
canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
canal c# 客户端: https://github.com/dotnetcore/CanalSharp
canal go客户端: https://github.com/CanalClient/canal-go
canal php客户端: https://github.com/xingwenge/canal-php、
github的地址:https://github.com/alibaba/canal/
D、在使用canal时,mysql需要开启binlog,并且binlog-format必须为row,可以在mysql的my.cnf文件中增加如下配置
log-bin=E:/mysql5.5/bin_log/mysql-bin.log
binlog-format=ROW
server-id=123、
E、 部署canal的服务端,配置canal.properties文件,然后 启动 bin/startup.sh 或bin/startup.bat
#设置要监听的mysql服务器的地址和端口
canal.instance.master.address = 127.0.0.1:3306
#设置一个可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#连接的数据库
canal.instance.defaultDatabaseName =test
#订阅实例中所有的数据库和表
canal.instance.filter.regex = .*\\..*
#连接canal的端口
canal.port= 11111
#监听到的数据变更发送的队列
canal.destinations= example
F、 客户端开发,在maven中引入canal的依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.21</version>
</dependency>
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | package com.example; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; public class CanalClientExample { public static void main(String[] args) { while ( true ) { //连接canal CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(AddressUtils.getHostIp(), 11111 ), "example" , "canal" , "canal" ); connector.connect(); //订阅 监控的 数据库.表 connector.subscribe( "demo_db.user_tab" ); //一次取10条 Message msg = connector.getWithoutAck( 10 ); long batchId = msg.getId(); int size = msg.getEntries().size(); if (batchId < 0 || size == 0 ) { System.out.println( "没有消息,休眠5秒" ); try { Thread.sleep( 5000 ); } catch (InterruptedException e) { e.printStackTrace(); } } else { // CanalEntry.RowChange row = null ; for (CanalEntry.Entry entry : msg.getEntries()) { try { row = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = row.getRowDatasList(); for (CanalEntry.RowData rowdata : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList(); Map<String, Object> dataMap = transforListToMap(afterColumnsList); if (row.getEventType() == CanalEntry.EventType.INSERT) { //具体业务操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.UPDATE) { //具体业务操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.DELETE) { List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { if ( "id" .equals(column.getName())) { //具体业务操作 System.out.println( "删除的id:" + column.getValue()); } } } else { System.out.println( "其他操作类型不做处理" ); } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } //确认消息 connector.ack(batchId); } } } public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) { Map map = new HashMap(); if (afterColumnsList != null && afterColumnsList.size() > 0 ) { for (CanalEntry.Column column : afterColumnsList) { map.put(column.getName(), column.getValue()); } } return map; } } |
2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase
我们有两种方式可以实现,
A、 使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中。
但是这种做法,效率很低,而且大批量的数据同时插入Hbase,对Hbase的性能影响很大。
在大数据量的情况下,使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。HFile的形式存在。Hfile的路径格式一般是这样的:
/hbase/data/default(默认是这个,如果hbase的表没有指定命名空间的话,如果指定了,这个就是命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>
B、 BulkLoad实现的原理就是按照HFile格式存储数据到HDFS上,生成Hfile可以使用hadoop的MapReduce来实现。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。
当然我们也可以不事先生成hfile,可以使用spark任务直接从hive中读取数据转换成RDD,然后使用HbaseContext的自动生成Hfile文件,部分关键代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | … //将DataFrame转换bulkload需要的RDD格式 val rddnew = datahiveDF.rdd.map(row => { val rowKey = row.getAs[String](rowKeyField) fields.map(field => { val fieldValue = row.getAs[String](field) (Bytes.toBytes(rowKey), Array((Bytes.toBytes( "info" ), Bytes.toBytes(field), Bytes.toBytes(fieldValue)))) }) }).flatMap(array => { (array) }) … //使用HBaseContext的bulkload生成HFile文件 hbaseContext.bulkLoad[Put](rddnew.map(record => { val put = new Put(record._1) record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) put }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload" ) val conn = ConnectionFactory.createConnection(hBaseConf) val hbTableName = TableName.valueOf(hBaseTempTable.getBytes()) val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn)) val realTable = conn.getTable(hbTableName) HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator) // bulk load start val loader = new LoadIncrementalHFiles(hBaseConf) val admin = conn.getAdmin() loader.doBulkLoad( new Path( "/tmp/bulkload" ),admin,realTable,regionLocator) sc.stop() } … def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = { val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList() import scala.collection.JavaConversions._ for (cells <- put.getFamilyCellMap.entrySet().iterator()) { val family = cells.getKey for (value <- cells.getValue) { val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value)) ret.+=((kfq, CellUtil.cloneValue(value))) } } ret.iterator } } … |
C、pg_bulkload的使用
这是一个支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过外部文件加载的方式,这个工具笔者没有亲自去用过,详细的介绍可以参考:https://my.oschina.net/u/3317105/blog/852785 pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/
3)、基于sqoop的全量导入
Sqoop 是hadoop生态中的一个工具,专门用于外部数据导入进入到hdfs中,外部数据导出时,支持很多常见的关系型数据库,也是在大数据中常用的一个数据导出导入的交换工具。
Sqoop从外部导入数据的流程图如下:
Sqoop将hdfs中的数据导出的流程如下:
本质都是用了大数据的数据分布式处理来快速的导入和导出数据。
联系客服