问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

spark sql 列怎么转换数据类型

发布网友 发布时间:2022-05-03 02:16

我来回答

2个回答

懂视网 时间:2022-05-03 06:37

1.编写给ResultSet添加spark的schema成员及DF(DataFrame)成员

/*
 spark、sc对象因为是全局的,没有导入,需自行定义
 teradata的字段类型转换成spark的数据类型
*/

import java.sql.{ResultSet, ResultSetMetaData}

import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}

object addDataframeMember {

 trait ResultSetMetaDataToSchema {
 def columnCount: Int

 def schema: StructType
 }

 implicit def wrapResultSetMetaData(rsmd: ResultSetMetaData) = {
 new ResultSetMetaDataToSchema {
 def columnCount = rsmd.getColumnCount

 def schema = {
 def tdCovert(tdDpeStr: String, precision: Int = 0, scale: Int = 0, className: String = ""): DataType = {
  tdDpeStr match {
  case "BYTEINT" => IntegerType
  case "SMALLINT" => ShortType
  case "INTEGER" => IntegerType
  case "BIGINT" => LongType
  case "FLOAT" => DoubleType
  case "CHAR" => CharType(precision)
  case "DECIMAL" => DecimalType(precision, scale)
  case "VARCHAR" => StringType
  case "BYTE" => ByteType
  case "VARBYTE" => ByteType
  case "DATE" => DateType
  case "TIME" => TimestampType
  case "TIMESTAMP" => TimestampType
  case "CLOB" => StringType
  case "BLOB" => BinaryType
  case "Structured UDT" => ObjectType(Class.forName(className))
  }
 }

 def col2StructField(rsmd: ResultSetMetaData, i: Int): StructField = StructField(rsmd.getColumnName(i), tdCovert(rsmd.getColumnTypeName(i), rsmd.getPrecision(i), rsmd.getScale(i), rsmd.getColumnClassName(i)), rsmd.isNullable(i) match { case 1 => true case 0 => false }).withComment(rsmd.getColumnLabel(i))

 def rsmd2Schema(rsmd: ResultSetMetaData): StructType = (1 to columnCount).map(col2StructField(rsmd, _)).foldLeft(new StructType)((s: StructType, i: StructField) => s.add(i))

 rsmd2Schema(rsmd)
 }
 }
 }

 trait ResultSetToDF {
 def schema: StructType

 def DF: DataFrame
 }

 implicit def wrapResultSet(rs: ResultSet) = {
 def rsmd = rs.getMetaData

 def toList[T](retrieve: ResultSet => T): List[T] = Iterator.continually((rs.next(), rs)).takeWhile(_._1).map(r => r._2).map(retrieve).toList

 def rsContent2Row(rs: ResultSet): Row = Row.fromSeq(Array.tabulate[Object](rsmd.columnCount)(i => rs.getObject(i + 1)).toSeq)

 new ResultSetToDF {
 def schema = rsmd.schema

 def DF = spark.createDataFrame(sc.parallelize(toList(rsContent2Row)), schema)
 }

 }


}

  

2.正常基于JDBC连接并且获得数据集游标

import java.sql.{Connection, DriverManager}

/*
 获取TeraData的连接
*/

val (dialect, host, user, passwd, database, charset) = ("teradata", "ip", "user", "password", "database", "ASCII")
val tdConf = collection.immutable.Map(
 "driver" -> "com.ncr.teradata.TeraDriver",
 "uri" -> s"jdbc:$dialect://$host/CLIENT_CHARSET=EUC_CN,TMODE=TERA,COLUMN_NAME=ON,CHARSET=ASCII,database=$database",
 "username" -> user,
 "password" -> passwd
)

def getTeraConn: Connection = {
 Class.forName(tdConf("driver"))
 DriverManager.getConnection(tdConf("uri"), tdConf("username"), tdConf("password"))
}
val sql = "SELECT TOP 10 * FROM xxx"
var conn = getTeraConn
val stmt = conn.createStatement()
val rs = stmt.executeQuery(sql)

 

3.导入隐式转换,调用成员

import addDataframeMember.wrapResultSet
rs.DF.show()

  

JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例

标签:har   lse   typename   编写   nec   scala   etc   sci   long   

热心网友 时间:2022-05-03 03:45

Java获取数据库的表中各字段的字段名,代码如下:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
public class TestDemo {
public static Connection getConnection() {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://数据库IP地址:3306/数据库名称";
String user = "数据库用户名";
String pass = "数据库用户密码";
conn = DriverManager.getConnection(url, user, pass);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
public static void main(String[] args) {
Connection conn = getConnection();
String sql = "select * from AccessType";
PreparedStatement stmt;
try {
stmt = conn.prepareStatement(sql);
ResultSet rs = stmt.executeQuery(sql);
ResultSetMetaData data = rs.getMetaData();
for (int i = 1; i <= data.getColumnCount(); i++) {
// 获得所有列的数目及实际列数
int columnCount = data.getColumnCount();
// 获得指定列的列名
String columnName = data.getColumnName(i);
// 获得指定列的列值
int columnType = data.getColumnType(i);
// 获得指定列的数据类型名
String columnTypeName = data.getColumnTypeName(i);
// 所在的Catalog名字
String catalogName = data.getCatalogName(i);
// 对应数据类型的类
String columnClassName = data.getColumnClassName(i);
// 在数据库中类型的最大字符个数
int columnDisplaySize = data.getColumnDisplaySize(i);
// 默认的列的标题
String columnLabel = data.getColumnLabel(i);
// 获得列的模式
String schemaName = data.getSchemaName(i);
// 某列类型的精确度(类型的长度)
int precision = data.getPrecision(i);
// 小数点后的位数
int scale = data.getScale(i);
// 获取某列对应的表名
String tableName = data.getTableName(i);
// 是否自动递增
boolean isAutoInctement = data.isAutoIncrement(i);
// 在数据库中是否为货币型
boolean isCurrency = data.isCurrency(i);
// 是否为空
int isNullable = data.isNullable(i);
// 是否为只读
boolean isReadOnly = data.isReadOnly(i);
// 能否出现在where中
boolean isSearchable = data.isSearchable(i);
System.out.println(columnCount);
System.out.println("获得列" + i + "的字段名称:" + columnName);
System.out.println("获得列" + i + "的类型,返回SqlType中的编号:"+ columnType);
System.out.println("获得列" + i + "的数据类型名:" + columnTypeName);
System.out.println("获得列" + i + "所在的Catalog名字:"+ catalogName);
System.out.println("获得列" + i + "对应数据类型的类:"+ columnClassName);
System.out.println("获得列" + i + "在数据库中类型的最大字符个数:"+ columnDisplaySize);
System.out.println("获得列" + i + "的默认的列的标题:" + columnLabel);
System.out.println("获得列" + i + "的模式:" + schemaName);
System.out.println("获得列" + i + "类型的精确度(类型的长度):" + precision);
System.out.println("获得列" + i + "小数点后的位数:" + scale);
System.out.println("获得列" + i + "对应的表名:" + tableName);
System.out.println("获得列" + i + "是否自动递增:" + isAutoInctement);
System.out.println("获得列" + i + "在数据库中是否为货币型:" + isCurrency);
System.out.println("获得列" + i + "是否为空:" + isNullable);
System.out.println("获得列" + i + "是否为只读:" + isReadOnly);
System.out.println("获得列" + i + "能否出现在where中:"+ isSearchable);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
ef英语哪个好 EF英孚英语培训怎么样? 英孚英语好不好 EF英孚教育到底好不好 大佬们,麦芒7和荣耀10那个值得入手?2500以下的机子还有啥好推荐的么... 介绍几款2500元以前的手机 像素一定要高 其他的不做要求 近期想入手一部安卓手机,价格2200到2500左右…买HTC desire Z还是 三星... 笔记本忘记开机密码怎么办急死了 笔记本电脑屏幕开机锁忘记密码 怎么办?急死了 华硕笔记本电脑开机密码忘记了怎样找回?系统是Windows 7旗舰版... 中国每年在哪里举办马拉松比赛 马拉松一般连续几届 举办马拉松比赛有何条件 请问举办马拉松运动要向什么部门申报? 泰安马拉松什么时候举行? 如何举办迷你马拉松 梦见自己给别人盖小花被子 中国一共举办了多少次马拉松 梦见自己喜欢的人给我盖被子,是怎么回事?求解,谢谢 郑州一年举办几次马拉松 苏州热衷于举办马拉松比赛的原因究竟是什么? 梦见我装睡 喜欢的人和我盖被子 马拉松的举办,对当地的经济会有哪些作用? 为什么苏州热衷于举办马拉松比赛呢? 梦见给暗恋的人缝被子是啥意思? 为什么蛋糕会粘在吸油纸上? 马拉松的举办,对当地的经济会都有哪些作用? 梦见自己喜欢的人说他冷,我给他盖了两个被子 梦见帮初恋情人盖被子怎么解 手机QQ群为什么我不能设置成员专属头衔???(つД`) 回老家了,梦见喜欢的人替我盖被子, 梦见我爱的人给我盖被子然后他走了 除了纸质报纸还会从哪些渠道了解新闻? 尼康z62的对焦速度如何? 获取新闻的主要途径有那些 尼康z6二代怎么样 大家都是通过什么渠道了解身边新闻的 健身房健身卡如何退款 入手尼康Z6拍小电影或纪录片怎么样? 今天的人们获取信息的渠道有那些??? 尼康z62拍视频卡没完,但不能拍什么原因 健身卡退卡法律依据 书面表达英语作文介绍当今人们获取新闻的方式 办了健身卡不到1天可以退吗 尼康z62写入速度 如何找到新闻发稿的渠道 尼康z62加1424f2.8拍星空用赤道仪吗 健身卡用了半年可以退吗 买了健身卡,还没开卡使用,可以退款吗? 健身房办卡到底可以退是吗?