问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

spark dataframe 字段可以有几种数据类型

发布网友 发布时间:2022-04-27 07:29

我来回答

1个回答

热心网友 时间:2022-05-02 15:53

import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode

object SimpleDemo extends App {
val sc = new SparkContext("local[*]", "test")
val sqlc = new SQLContext(sc)
val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"
val tableName = "tbaclusterresult"

//把数据转化为DataFrame,并注册为一个表
val df = sqlc.read.json("G:/data/json.txt")
df.registerTempTable("user")
val res = sqlc.sql("select * from user")
println(res.count() + "---------------------------")
res.collect().map { row =>
{
println(row.toString())
}
}

//从MYSQL读取数据
val jdbcDF = sqlc.read
.options(Map("url" -> driverUrl,
// "user" -> "root",
// "password" -> "root",
"dbtable" -> tableName))
.format("jdbc")
.load()
println(jdbcDF.count() + "---------------------------")
jdbcDF.collect().map { row =>
{
println(row.toString())
}
}

//插入数据至MYSQL
val schema = StructType(
StructField("name", StringType) ::
StructField("age", IntegerType)
:: Nil)

val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
("com", 40), ("bt", 33), ("www", 23))).
map(item => Row.apply(item._1, item._2))
import sqlc.implicits._
val df1 = sqlc.createDataFrame(data1, schema)
// df1.write.jdbc(driverUrl, "sparktomysql", new Properties)
df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)

//DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:
//def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

//插入数据到MYSQL
val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
data.foreachPartition(myFun)

case class Blog(name: String, count: Int)

def myFun(iterator: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"
try {
conn = DriverManager.getConnection(driverUrl, "root", "root")
iterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setInt(2, data._2)
ps.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
}
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
网上订酒店哪个网好 经常听了歌就使劲摇头,长期会对大脑有什么影响吗 携程有什么优点 吉娃娃幼犬多少钱一只吉娃娃犬多少钱一只黑龙江省那有卖 吉娃娃现在多少钱一只? 预防中风的小动作有哪些呢? 京巴,吉娃娃,这类小型宠物狗大概多少钱? 摇头时颈椎有响声 旅行社怎么和携程合作推广旅游线路 不是纯种的吉娃娃狗狗能卖多少钱呢 在舞台上唱歌,自己的声音听不到或者很小怎么办? cmd.exe是什么鬼 Spark RDD,DataFrame和DataSet的区别 我在全民k歌里唱歌,录制完作品以后怎么只能听到原唱唱歌而听不到自己的声音 电脑进去显示cmd.exe 怎么办 cmd.exe在电脑什么地方能找到? OPPO手机唱歌的时候耳机里听不到自己的声音是为什么? 为什么在K房里唱歌听不到自己声音? 手机唱歌听不到自己的声音。 唱歌时听不到自己的声音??? 为什么全民k歌,唱歌听不到自己的声音? ...录制完作品以后怎么只能听到原唱唱歌而听不到自己的声音?_百度... 为什么全民k歌唱完歌听不到自己的声音??? 陌陌ktv唱歌听不到自己的声音 为什么用耳机唱歌的时候听不到自己的声音? 怎样充分借助互联网自学? 用互联网学语文的方法 如何有效利用互联网学语文 袋式过滤器和板框压力式过滤器的选择 硬塑料筐在帆布袋里侧翻会坏吗? 电脑进程里老是出现好几十个cmd.exe进程 耳机唱歌听不到自己的声音 cmd.exe是什么 电脑开机出现cmd.exe cmd.exe是什么啊? python,爬虫,pandas的DataFrame处理后的数据,输出到文本后中间这些数据... cmd.exe有哪些用处??? 怎么取dataframe python 电脑一启动就显示管理员 cmd.exe python dataframe的换行符怎么处理 cmd.exe是什么文件,怎么打开 Dataframe里字段过长被截取怎么能显示完整的数据 cmd.exe是什么进程?可以删除吗? 电脑开机出现管理员:cmd.exe? python dataframe multiindex 有几层 cmd.exe文件的解决 开机自动运行cmd.exe Python怎么转换Dictionary为DataFrame 无法删除cmd.exe? 谁有背着吉他的男生背影动漫图片啊!求帮忙!