博客
关于我
Spark Sql JDBC实现 聚合、union、同数据源Join等下推
阅读量:617 次
发布时间:2019-03-13

本文共 5765 字,大约阅读时间需要 19 分钟。

Spark Sql JDBC实现 聚合、union、同数据源Join等下推

简单熟悉下Spark Sql 处理JDBC数据源数据

spark Sql处理JDBC数据源的代码比较简单,大家可以自行阅读官网使用demo。

个人觉得比较鸡肋的地方

单元测试如下

说明: emp,dept是通过spark 读取mysql的同一个库的两张表

test("selectSubQuery"){    val sql =      """        |select *        |from   emp where id > (select min(a.id) from emp a join dept b on a.deptId =b.id)        |""".stripMargin    val frameResult = session.sql(sql)    frameResult.explain(true)    frameResult.show()  }

查看执行计划

== Parsed Logical Plan =='Project [*]+- 'Filter ('id > scalar-subquery#34 [])   :  +- 'Project [unresolvedalias('min('a.id), None)]   :     +- 'Join Inner, ('a.deptId = 'b.id)   :        :- 'SubqueryAlias a   :        :  +- 'UnresolvedRelation [emp], [], false   :        +- 'SubqueryAlias b   :           +- 'UnresolvedRelation [dept], [], false   +- 'UnresolvedRelation [emp], [], false== Analyzed Logical Plan ==ID: int, NAME: string, JOB: string, MGR: int, HIREDATE: date, SAL: decimal(20,2), COMM: decimal(20,2), DEPTID: intProject [ID#12, NAME#13, JOB#14, MGR#15, HIREDATE#16, SAL#17, COMM#18, DEPTID#19]+- Filter (id#12 > scalar-subquery#34 [])   :  +- Aggregate [min(id#12) AS min(id)#36]   :     +- Join Inner, (deptId#19 = id#6)   :        :- SubqueryAlias a   :        :  +- SubqueryAlias emp   :        :     +- Relation[ID#12,NAME#13,JOB#14,MGR#15,HIREDATE#16,SAL#17,COMM#18,DEPTID#19] JDBCRelation(emp) [numPartitions=1]   :        +- SubqueryAlias b   :           +- SubqueryAlias dept   :              +- Relation[ID#6,NAME#7,LOC#8] JDBCRelation(dept) [numPartitions=1]   +- SubqueryAlias emp      +- Relation[ID#12,NAME#13,JOB#14,MGR#15,HIREDATE#16,SAL#17,COMM#18,DEPTID#19] JDBCRelation(emp) [numPartitions=1]== Optimized Logical Plan ==Filter (isnotnull(id#12) AND (id#12 > scalar-subquery#34 [])):  +- Aggregate [min(id#12) AS min(id)#36]:     +- Project [ID#12]:        +- Join Inner, (deptId#19 = id#6):           :- Project [ID#12, DEPTID#19]:           :  +- Filter isnotnull(deptId#19):           :     +- Relation[ID#12,NAME#13,JOB#14,MGR#15,HIREDATE#16,SAL#17,COMM#18,DEPTID#19] JDBCRelation(emp) [numPartitions=1]:           +- Project [ID#6]:              +- Filter isnotnull(id#6):                 +- Relation[ID#6,NAME#7,LOC#8] JDBCRelation(dept) [numPartitions=1]+- Relation[ID#12,NAME#13,JOB#14,MGR#15,HIREDATE#16,SAL#17,COMM#18,DEPTID#19] JDBCRelation(emp) [numPartitions=1]== Physical Plan ==*(1) Filter (ID#12 > Subquery scalar-subquery#34, [id=#64]):  +- Subquery scalar-subquery#34, [id=#64]:     +- *(6) HashAggregate(keys=[], functions=[min(id#12)], output=[min(id)#36]):        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#60]:           +- *(5) HashAggregate(keys=[], functions=[partial_min(id#12)], output=[min#46]):              +- *(5) Project [ID#12]:                 +- *(5) SortMergeJoin [deptId#19], [id#6], Inner:                    :- *(2) Sort [deptId#19 ASC NULLS FIRST], false, 0:                    :  +- Exchange hashpartitioning(deptId#19, 200), ENSURE_REQUIREMENTS, [id=#45]:                    :     +- *(1) Scan JDBCRelation(emp) [numPartitions=1] [ID#12,DEPTID#19] PushedFilters: [*IsNotNull(DEPTID)], ReadSchema: struct
: +- *(4) Sort [id#6 ASC NULLS FIRST], false, 0: +- Exchange hashpartitioning(id#6, 200), ENSURE_REQUIREMENTS, [id=#51]: +- *(3) Scan JDBCRelation(dept) [numPartitions=1] [ID#6] PushedFilters: [*IsNotNull(ID)], ReadSchema: struct
+- *(1) Scan JDBCRelation(emp) [numPartitions=1] [ID#12,NAME#13,JOB#14,MGR#15,HIREDATE#16,SAL#17,COMM#18,DEPTID#19] PushedFilters: [*IsNotNull(ID)], ReadSchema: struct

执行结果如下

在这里插入图片描述

缺点分析

在这里插入图片描述

上图可以看出

1.spark 总共进行了三次JDBC 连接进行查询请求。
2.聚合,join没有进行下沉,增加了内存计算的复杂度。

再次说明下,单元测试中的查询请求,比较简单,

就是查询同一个库的两张表格,会进行三次访问数据库,并且没有做到聚合和join的下沉(spark 目前支持了 谓词下推,列裁剪,常量累加等优化),频繁的查库和数据的网络Io比较消耗时间。

优化操作

这里的优化主要使用的是Spark 提供的扩展接口Extensions,优化spark 的RBO和CBO规则。这里的原理和细节比较复杂,下篇文章描述。

优化后的执行计划

== Parsed Logical Plan =='Project [*]+- 'Filter ('id > scalar-subquery#34 [])   :  +- 'Project [unresolvedalias('min('a.id), None)]   :     +- 'Join Inner, ('a.deptId = 'b.id)   :        :- 'SubqueryAlias a   :        :  +- 'UnresolvedRelation [emp], [], false   :        +- 'SubqueryAlias b   :           +- 'UnresolvedRelation [dept], [], false   +- 'UnresolvedRelation [emp], [], false== Analyzed Logical Plan ==ID: int, NAME: string, JOB: string, MGR: int, HIREDATE: date, SAL: decimal(20,2), COMM: decimal(20,2), DEPTID: intProject [ID#12, NAME#13, JOB#14, MGR#15, HIREDATE#16, SAL#17, COMM#18, DEPTID#19]+- Filter (id#12 > scalar-subquery#34 [])   :  +- Aggregate [min(id#12) AS min(id)#36]   :     +- Join Inner, (deptId#19 = id#6)   :        :- SubqueryAlias a   :        :  +- SubqueryAlias emp   :        :     +- RelationV2[ID#12, NAME#13, JOB#14, MGR#15, HIREDATE#16, SAL#17, COMM#18, DEPTID#19] nameString   :        +- SubqueryAlias b   :           +- SubqueryAlias dept   :              +- RelationV2[ID#6, NAME#7, LOC#8] nameString   +- SubqueryAlias emp      +- RelationV2[ID#12, NAME#13, JOB#14, MGR#15, HIREDATE#16, SAL#17, COMM#18, DEPTID#19] nameString== Optimized Logical Plan ==RelationV2[ID#47, NAME#48, JOB#49, MGR#50, HIREDATE#51, SAL#52, COMM#53, DEPTID#54] nameString== Physical Plan ==*(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@237c8718 [ID#47,NAME#48,JOB#49,MGR#50,HIREDATE#51,SAL#52,COMM#53,DEPTID#54] PushedFilters: [], ReadSchema: struct

在这里插入图片描述

在这里插入图片描述

从优化后的物理执行计划可以看出,之前三次的JDBC 请求,已经合并为一次,并且之前内存计算的聚合,join 操作也一起下沉到了Sql 中了,两次的查询结果也完全一致。

可能小伙伴们会有疑问: 这不就是相当于把计算交给了数据源去做吗? 是的,就是这样的。

个人觉得好处如下

  1. 节约了多次建立JDBC请求的时间。

  2. 如果表的数据量很大,节约了数据传输的网络和Io的时间。

  3. 移动计算,而非移动数据,数据在数据库本地有 索引,缓存等等数据库的优化夹持,计算并不比spark慢。

简单的性能测试:

说明: 由于是本地的数据库测试,并且数据量不是很大,可能优势并不是很明显。在生产环境下优化肯定成倍数提升。
在这里插入图片描述

在这里插入图片描述

结论

上面两幅图可以看出来优化了将近60%的时间消耗,效果还是很明显的。

如果有什么错误的地方也欢迎大家指正。

转载地址:http://oinaz.baihongyu.com/

你可能感兴趣的文章
mudbox卸载/完美解决安装失败/如何彻底卸载清除干净mudbox各种残留注册表和文件的方法...
查看>>
mysql 1264_关于mysql 出现 1264 Out of range value for column 错误的解决办法
查看>>
mysql 1593_Linux高可用(HA)之MySQL主从复制中出现1593错误码的低级错误
查看>>
mysql 5.6 修改端口_mysql5.6.24怎么修改端口号
查看>>
MySQL 8.0 恢复孤立文件每表ibd文件
查看>>
MySQL 8.0开始Group by不再排序
查看>>
mysql ansi nulls_SET ANSI_NULLS ON SET QUOTED_IDENTIFIER ON 什么意思
查看>>
multi swiper bug solution
查看>>
MySQL Binlog 日志监听与 Spring 集成实战
查看>>
MySQL binlog三种模式
查看>>
multi-angle cosine and sines
查看>>
Mysql Can't connect to MySQL server
查看>>
mysql case when 乱码_Mysql CASE WHEN 用法
查看>>
Multicast1
查看>>
mysql client library_MySQL数据库之zabbix3.x安装出现“configure: error: Not found mysqlclient library”的解决办法...
查看>>
MySQL Cluster 7.0.36 发布
查看>>
Multimodal Unsupervised Image-to-Image Translation多通道无监督图像翻译
查看>>
MySQL Cluster与MGR集群实战
查看>>
multipart/form-data与application/octet-stream的区别、application/x-www-form-urlencoded
查看>>
mysql cmake 报错,MySQL云服务器应用及cmake报错解决办法
查看>>