TiSpark 3.0.0 新特性实践
作者: 数据小黑
TiSpark 3.0.0 于 6 月 15 号发布了,新的版本中提到了很多期望已久的功能,本文对几个新特性做了对比测试,验证新版本的特性是否符合线上要求。本文基础运行环境为 Spark On kubernetes,Spark 镜像打包时,已包含 TiSpark 必要的依赖。
下面的兼容性更改与新特性摘自官方:
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")数据读取:
.set("spark.tispark.pd.addresses", pd_addr);
spark.sql("use sbtest");注意 use sbtest 中的 sbtest 是 tidb 的数据库。 在 TiSpark 3.0.0 中需按照如下配置:
String source_sql = "select id,avg(k),max(c),count(`pad`) from sbtest_o so group by id order by id";
spark.sql(source_sql).show();
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")数据读取:
.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);
spark.sql("use tidb_catalog.sbtest");此案例中 use tidb_catalog.sbtest 中 tidb_catalog 是 spark.sql.catalog.tidb_catalog 配置中指定的,如果配置时使用.set("spark.sql.catalog.tidb_catalog2", "org.apache.spark.sql.catalyst.catalog.TiCatalog"),那么 use 时,需使用 use tidb_catalog2.sbtestTiSpark's jar has a new naming rule like tispark-assembly-{$spark_version}_{$scala_version}-{$tispark_verison}此特性把 scala_version 版本号体现在版本命名中,版本包命名由 tispark-assembly-3.0-2.5.1.jar 变化成了 tispark-assembly-3.0_2.12-3.0.0.jar。
String source_sql = "select id,avg(k),max(c),count(`pad`) from sbtest_o so group by id order by id";
spark.sql(source_sql).show();
val spark = SparkSession.builder.config(sparkConf).getOrCreate();Support TLS with reload capability支持 TLS 并具备动态更新证书的能力
spark.conf().set("spark.tispark.stale_read", 1651766410000L); //"2022-05-06 00:00:10"
spark.sql("select * from test.t");
spark.conf().set("spark.tispark.stale_read", 1651766420000L); //"2022-05-06 00:00:20"
spark.sql("select * from test.t");
spark.conf().set("spark.tispark.stale_read", "");
spark.sql("select * from test.t");
我们有个 tidb 实验环境,是在 k8s 里面的,这个环境配套了一个 spark on k8s 环境。这次测试是在 spark on k8s+tidb on k8s 中测试的。
基于 spark 3.0.3 on k8s 测试非 catalog plugin 的配置运行情况。 因为以前的项目都是基于 maven 的,本次已经在 maven 项目下进行测试,首先修改项目依赖:
<properties>
<tispark.spark.version>3.0_2.12</tispark.spark.version>
<tispark.version>3.0.0</tispark.version>
</properties>
<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-assembly-${tispark.spark.version}</artifactId>
<version>${tispark.version}</version>
</dependency>
本次依赖包的命名有变化,artifactId 由 tispark-assembly 改变成 tispark-assembly-3.0_2.12 也就是说对应每个版本的 spark,artifactId 不同,类似下表所列:
spark 版本
artifactid 版本
spark 3.0.X
tispark-assembly-3.0_2.12
spark 3.1.X
tispark-assembly-3.1_2.12
spark 3.2.X
tispark-assembly-3.2_2.12
其次隐藏掉关于 catalog 的两行配置,修改后如下所示:
SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
//.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
//.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);
运行 spark 程序,具体运行配置过程参考:/news/upload/ueditor/image/202208/iswimzcnbur 运行日志如下:
22/06/21 01:42:49 ERROR TiExtensions$: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
com.pingcap.tikv.exception.TiInternalException: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
at org.apache.spark.sql.TiExtensions$.validateCatalog(TiExtensions.scala:79)
代码中加了个检查,在没有配置 Catalog 的情况下,检查报错,提示 TiSpark must work with TiCatalog。
本来测试的是 Spark 3.2.1+TiSpark 3.0.0,结果有报错,详见:https://asktug.com/t/topic/694197 更换成 Spark 3.0.3+TiSpark 3.0.0 stale read + delete 特性测试 测试构建代码如下:
String pd_addr = "basic-pd.tidb-cluster:2379";
String tidb_addr = "basic-tidb.tidb-cluster";
SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);
SparkSession spark = SparkSession
.builder().appName("RdbToRdbProcess")
.config(conf)
.getOrCreate();
// 通过 TiSpark 将 DataFrame 批量写入 TiDB
Map<String, String> tiOptionMap = new HashMap<String, String>();
tiOptionMap.put("tidb.addr", tidb_addr);
tiOptionMap.put("tidb.port", "4000");
tiOptionMap.put("tidb.user", username);
tiOptionMap.put("tidb.password", password);
tiOptionMap.put("replace", "true");
tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);
spark.sql("use tidb_catalog.sbtest2");
// 获取当前时间戳
long ttl=System.currentTimeMillis();
System.out.println("删除前查询");
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("删除");
spark.sql("delete from sbtest_t_t where id = 100").show();
System.out.println("删除后查询");
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("stale read");
spark.conf().set("spark.tispark.stale_read", ttl);
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("置空时间戳之后查询");
spark.conf().set("spark.tispark.stale_read", "");
spark.sql("select * from sbtest_t_t where id = 100").show();
上述查询结果依次:
删除前查询:
+---+------+--------------------+--------------------+
| id| k| c| pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
删除后查询:
+---+---+---+---+
| id| k| c|pad|
+---+---+---+---+
+---+---+---+---+
stale read:
+---+------+--------------------+--------------------+
| id| k| c| pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
置空时间戳之后查询:
+---+---+---+---+
| id| k| c|pad|
+---+---+---+---+
+---+---+---+---+
由以上查询结果可知,数据能够执行删除,删除后正常查询是查询不到的,使用 stale read,利用删除之前的时间戳能够查询到数据,置空时间戳后,恢复普通查询,数据能够查询到。
本次版本中 stale read + delete 让 tispark 具有更灵活的应用场景,经过验证 spark 3.2.1 跟 tidb 6.1.0 on k8s 通讯还有些问题仍需解决,另外一方面,也盼望着能兼容分区表的一些操作能发布出来,比如说导入数据之前能够 truncate 分区一类的操作。
版权声明
本文仅代表作者观点,不代表博信信息网立场。