添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

1、问题描述

关于SS的8小时问题之前也写了2个文章,以为问题已经得到了解决,但是发现想的太简单,再次遇到这个问题,折腾了2天,现在把这个问题梳理一下。大数据平台使用的是CDH-6.2.0版本,安装在Centos7.6 64位服务器上,服务器时区为CST时区,SS使用的是3.13.0版本。

数据同步的需要首先是从各个业务系统MySQL同步到中间库MySQL上,再根据数据分析的需要,有选择性的从中间库MySQL同步到大数据平台的kudu中,MySQL版本统一为5.6的,MySQL时区和安装MySQL的服务器保持一致,即CST时区。中间库的表结构完全与业务系统保持一致。现在需要保证数据从业务库同步到中间库,中间库同步到kudu都是准确的。

业务库MySQL表时间字段既有date也有datetime和timestamp类型,中间库保持和业务系统一致,因此也是这三种类型都存在,kudu的时间类型只有timestamp。

先说一下业务库到中间库同步的时间问题,存量数据同步时三种MySQL时间类型同步到中间库MySQL都没有问题。增量数据通过binlog实时同步,业务系统MySQL的datetime类型同步到中间库会多出8小时,在JDBC Producer中配置serverTimezone为GMT+8无效,timestamp类型的没问题,date类型没问题。

再说一下中间库同步到kudu的时间问题(这里假设中间库时间都是正确的),存量数据同步时MySQL的date、datetime、timestamp到kudu的timestamp后时间都少了8小时,在JDBC Query Consumer中JDBC项的Additional JDBC Configuration Properties增加时区配置

serverTimezone UTC
useTimezone	true

配置后MySQL的datetime和timestamp类型到kudu的timestamp类型时间正确,但是MySQL的date类型同步到kudu的timestamp后少8小时。增量数据通过binlog实时同步,MySQL的date类型和datetime类型同步到kudu的timestamp正确,但是MySQL的timestamp同步到kudu的timestamp后时间少了8小时。

2、解决方案

2.1、测试数据

MySQL建表

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `weight` float DEFAULT NULL,
  `start_date` date DEFAULT NULL,
  `date_created` datetime DEFAULT CURRENT_TIMESTAMP,
  `last_updated` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `na` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (1, 'admin', 19, 100, '2020-06-02', '2020-06-02 14:16:54', '2020-06-02 14:17:00');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (2, 'test', 20, 120, '2020-06-02', '2020-06-02 14:17:10', '2020-06-02 14:17:12');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (3, 'cc', 21, 96, '2020-06-02', '2020-06-02 14:17:24', '2020-06-02 14:24:07');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (4, 'dd', 22, 90, '2020-06-02', '2020-06-02 14:26:57', '2020-06-02 14:42:25');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (5, 'rr', 23, 101, '2020-06-01', '2020-06-02 14:41:51', '2020-06-02 14:56:26');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (6, 'ee', 24, 86, '2020-06-02', '2020-06-02 14:56:52', '2020-06-02 14:56:52');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (7, 'tt', 26, 95, '2020-06-02', '2020-06-02 15:05:04', '2020-06-02 15:28:27');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (8, 'yy', 28, 98, '2020-06-02', '2020-06-02 15:29:01', '2020-06-02 15:32:09');
INSERT INTO `test`.`test`(`id`, `name`, `age`, `weight`, `start_date`, `date_created`, `last_updated`) VALUES (9, 'qq', 26, 96, '2020-06-02', '2020-06-02 15:32:32', '2020-06-02 15:34:00');

kudu建表

CREATE TABLE dw_dwd_sup.dwd_sup_test (
  id INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  age INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  weight FLOAT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  start_date timestamp NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  date_created timestamp NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  last_updated timestamp NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  PRIMARY KEY (id)
PARTITION BY HASH (id) PARTITIONS 3 STORED AS KUDU;
CREATE TABLE dw_dwd_sup.dwd_sup_test2 (
  id INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  age INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  weight FLOAT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  start_date STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  date_created STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  last_updated STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
  PRIMARY KEY (id)
PARTITION BY HASH (id) PARTITIONS 3 STORED AS KUDU;

2.2、业务库到中间库

参考github链接

https://github.com/shyiko/mysql-binlog-connector-java/issues/193

下载SS 0.13.0中使用的mysql-binlog-connector-java-0.13.0.jar源码,(具体版本在SS的/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录下),解压后修改AbstractRowsEventDataDeserializer类的from方法和fallbackToGC方法

public static long from(int year, int month, int day, int hour, int minute, int second, int millis) {
	return fallbackToGC(year, month, day, hour, minute, second, millis);
	//if (year < 1582 || (year == 1582 && (month < 10 || (month == 10 && day < 15)))) {
	//    return fallbackToGC(year, month, day, hour, minute, second, millis);
	//long timestamp = 0;
	//int numberOfLeapYears = leapYears(1970, year);
	//timestamp += 366L * 24 * 60 * 60 * numberOfLeapYears;
	//timestamp += 365L * 24 * 60 * 60 * (year - 1970 - numberOfLeapYears);
	//long daysUpToMonth = isLeapYear(year) ? LEAP_YEAR_DAYS_BY_MONTH[month - 1] : YEAR_DAYS_BY_MONTH[month - 1];
	//timestamp += ((daysUpToMonth + day - 1) * 24 * 60 * 60) +
	//    (hour * 60 * 60) + (minute * 60) + (second);
	//timestamp = timestamp * 1000 + millis;
	//return timestamp;
// checkstyle, please ignore ParameterNumber for the next line
private static long fallbackToGC(int year, int month, int dayOfMonth, int hourOfDay,
								 int minute, int second, int millis) {
	//Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
	Calendar c = Calendar.getInstance();
	c.set(Calendar.YEAR, year);
	c.set(Calendar.MONTH, month - 1);
	c.set(Calendar.DAY_OF_MONTH, dayOfMonth);
	c.set(Calendar.HOUR_OF_DAY, hourOfDay);
	c.set(Calendar.MINUTE, minute);
	c.set(Calendar.SECOND, second);
	c.set(Calendar.MILLISECOND, millis);
	return c.getTimeInMillis();
mvn package -DskipTests

替换服务器上的mysql-binlog-connector-java-0.13.0.jar,重启SS使其生效。重新验证业务系统存量同步到中间库MySQL,三种数据类型同步后都正确,验证业务系统增量同步到中间库MySQL,三种数据类型同步后都正确,至此,业务系统MySQL同步数据到中间库MySQL时间类型数据问题解决。后面在上生产环境的时候SS用的是3.12.0,发现替换jar包后binlog同步数据中间库比业务库时间少13小时,这个可能还是和服务器环境有关,在另一台服务器上安装3.12.0版本的没有问题,少13小时可以通过在jdbc url中配置serverTimezone=Asia/Shanghai来解决。

2.3、中间库到kudu

在2.2的中我修改了mysql-binlog-connector-java-0.13.0.jar并替换了自带的,保证了业务库MySQL到中间库MySQL的数据准确性,在此基础上我验证了一下中间库MySQL到kudu的数据同步,MySQL的date、datetime、timestamp类型在kudu中都是timestamp类型,存量数据同步到kudu后三种类型都少了8小时,在JDBC Query Consumer中JDBC项的Additional JDBC Configuration Properties增加时区配置

serverTimezone UTC
useTimezone	true

配置后MySQL的datetime和timestamp类型到kudu的timestamp类型时间正确,但是MySQL的date类型同步到kudu的timestamp后少8小时。增量数据通过binlog实时同步,同步到kudu后三种类型都少了8小时。我通过spark代码测试将MySQL的date、datetime和timestamp类型数据写入kudu,发现时间也是少了8小时,但是当我在运行代码是时候指定时区为UTC后数据写入到kudu就是正确的(在代码中指定TimeZone.setDefault(TimeZone.getTimeZone("UTC")),或者对JVM指定-Duser.timezone=UTC),说明其实已经和binlog组件无关,是kudu本身的问题,在处理这个问题时最终的数据出口一份是到kudu,另一份是写入本地方便我比对,通过查看写到本地的数据发现正确的,到这里问题已经看似无解了,因为已经知道是写kudu的问题,并且设置为UTC时区就可以得到正确的结果,我又做了一些尝试,但是最终都无解,还把2.2中业务库同步到中间库MySQL搞混乱了,所以2.2中的基础我决定不再改动,从kudu本身着手。

最终我采用的方式是修改kudu的字段类型,将MySQL的date、datetime、timestamp类型都映射为kudu的string类型,kudu不支持修改表字段类型,因此全部删除重建,重新验证。需要注意的是做了这种修改后SS的pipleline中不能直接将数据入到kudu了,因为SS中的数据都是CST时区,直接存到kudu中会变成这种格式

通过使用Field Type Converter组件将date、datetime和timestamp类型转为string,注意SS中是没有timestamp类型的,MySQL的datetime和timestamp在SS中都是datetime类型,因此将MySQL的datetime和timestamp类型格式为yyyy-MM-dd HH:mm:ss,将MySQL的date类型格式化为yyyy-MM-dd,JDBC Query Consumer中已经无需再指定时区为UTC了,存量同步的pipeline如下。

存量同步时MySQL的date、datetime和timestamp同步到kudu的String都正确,增量数据通过binlog实时同步到kudu后三种时间类型值也都正确,在kudu中这三种MySQL数据类型都是以string类型存储。增量同步的pipeline如下

至此,关于SS同步数据过程中时间类型的处理告一段落!

图片加载出现OOMsetBackgroundResource加载图片的额时候。会出现错误。private final int[] imageIds = { R.mipmap.a, R.mipmap.b, R.mipmap.c, R.mipmap.d, R.mipmap.e }; ImageView image = new ImageView(this); image.setBackgro
1、问题描述 SS的前端界面时间显示默认使用的是CST时区,和中国时区有6小时时差(少了6小时),实际时间是2020-04-07 10:20:00,显示为2020-04-07 04:20:00 2、解决方案 进行Settings,修改Timezone为UTC即可,无需重启。
1、问题描述 当前使用的是CDH-6.2.0和SS-3.13.0版本,在同步增量binlog数据到MySQL的时候datetime格式的字段时间会多出8小时,基于这个原因我们修改了SS里面的mysql-binlog-connector-java-0.13.0.jar源码 streamsets-3.13.0中使用的mysql-binlog-connector-java是0.13.0版本,找到0.13.0的源码, https://github.com/shyiko/mysql-binlog-connec
最近发现了一个很好用的工具streamsets工具。我将oracle数据库当中的数据增量的导入到hive当中。导入是按照唯一的主键ID将数据导入进来。 出现的问题如下: (1)数据精度的问题: 因为表是提前创建好的,我将id字段定义为int类型,但是在oracle数据库当中的数据类型是number类型。然后将number类型的数据转换为decimal类型的数据。decim...
streamsets问题总结 streamsets是一种纯开源的ETL工具,适用场景是不断递增的流式数据,基于原理识别业务逻辑上的增加来识别,实际上有个timer一直在循坏,但它并不是真正的主键,streamsets只是隐式地,不是真正地,最后还是要对于源表这种主键,删除与update不太行,通过另一个逻辑去处理,处理起来比较复杂,它里面配置有一个offset,记录了你最后一次抽取id的值。 streamsets 连接sqlserver2008R2 老版本上面遇到的问题 sqlserver2008版本上支持
1、问题描述 CDH-6.2.0,SS-3.13.0。在使用SS同步mysql数据到kudu时使用JDBC Query Consumer,因为mysql的datetime数据到kudu的timestamp后少了8小时,给JDBC Query Consumer中JDBC配置添加了额外配置项,在最后的Additional JDBC Configuration Properties中增加2个配置 serverTimezone UTC useTimezone true
最近参与一个项目的开发,leader和产品决定使用开源的streamsets做底层服务和监控页面。除开发产品模块页面外,我还负责streamsets前端的修改和与自己开发模块的集成。 经过调研,streamsets前端使用的技术栈和我们公司常用的技术有些出入,并不太方便直接从代码层面合成一个项目,决定在自己项目中使用iframe嵌入streamsets监控、日志等页面。 最终整个产品架构图如下...
1. 简介 Streamsets是一款大数据实时采集和ETL工具,可以实现不写一行代码完成数据的采集和流转。通过拖拽式的可视化界面,实现数据管道(Pipelines)的设计和定时任务调度。最大的特点有:  可视化界面操作,不写代码完成数据的采集和流转  内置监控,可是实时查看数据流传输的基...