Bitget下载

注册下载Bitget下载,邀请好友,即有机会赢取 3,000 USDT

APP下载   官网注册

一、文件加载

1. spark.read.load

是加载数据的通用方法, 默认 加载和保存的是 parquet 格式文件

read可读格式

2. spark.read.format("…")[.option("…")].load("…")

  • format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
  • load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
  • option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 我们前面都是使用read API 先把文件加载到 DataFrame 然后再查询

也可以直接在文件上进行查询: 文件格式 .` 文件路径 `

spark.sql("select * from json.`文件路径`").show

二、保存数据

1.df.write.save 保存数据的通用方法

保存不同格式的数据,可以对不同的数据格式进行设定

2.df.write.format("…")[.option("…")].save("…")

  • format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
  • save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
  • option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode() 方法来设置。有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。

其中SaveMode 是一个枚举类,其中的常量包括:

Scala/Java

Any Language

Meaning

SaveMode.ErrorIfExists(default)

"error"(default)

如果文件已经存在则抛出异常

SaveMode.Append

"append"

如果文件已经存在则追加

SaveMode.Overwrite

"overwrite"

如果文件已经存在则覆盖

SaveMode.Ignore

"ignore"

如果文件已经存在则忽略

例如:

df.write.mode("append").json("output")

三、特殊数据源的加载和保存

1.MySQL

1)加载数据(需要导入MySQL连接依赖)

导入依赖

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>

案例实操:

val conf: SparkConf = newSparkConf().setMaster("local[*]").setAppName("SparkSQL")//创建 SparkSession 对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._//方式 1:通用的 load 方法读取spark.read.format("jdbc").option("url", "jdbc:mysql://node1:3306/sparkSql").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "123123").option("dbtable", "user").load().show//方式 2:通用的 load 方法读取 参数另一种形式spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://node1:3306/sparkSql?user=root&password= 123123","dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show//方式 3:使用 jdbc 方法读取val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123123")val df: DataFrame = spark.read.jdbc("jdbc:mysql://node1:3306/sparkSql", "user", props)df.show//释放资源spark.stop()

2)保存数据

case class User2(name: String, age: Long)val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")//创建 SparkSession 对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30)))val ds: Dataset[User2] = rdd.toDS//方式 1:通用的方式 format 指定写出类型ds.write.format("jdbc").option("url", "jdbc:mysql://node1:3306/sparkSql").option("user", "root").option("password", "123123").option("dbtable", "user").mode(SaveMode.Append).save()//方式 2:通过 jdbc 方法val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123123")ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://node1:3306/sparkSql", "user", props)//释放资源spark.stop()

2.hive

1)内嵌hive

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可(使用 spark.sql() 方式).

Hive 的元数据存储在 derby 中, 默认仓库地址: $SPARK_HOME/spark-warehouse

scala> spark.sql("show tables").showscala> spark.sql("create table aa(id int)")scala> spark.sql("show tables").show//向表加载本地数据scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")scala> spark.sql("select * from aa").show

2)外部的 HIVE

连接外部hive步骤:

  • Spark 要接管 Hive 需要把hive-site.xml 拷贝到conf/目录下
  • 把 Mysql 的驱动 copy 到 jars/目录下
  • 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  • 重启 spark-shell

连接后和内嵌hive操作相同

3)运行 Spark SQL CLI

Spark SQL CLI 可以很方便的在本地运行Hive 元数据服务以及从命令行执行查询任务。在Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似一Hive 窗口

bin/spark-sql

4)运行Spark beeline

Spark Thrift Server 是Spark 社区基于HiveServer2 实现的一个Thrift 服务。旨在无缝兼容HiveServer2。因为 Spark Thrift Server 的接口和协议都和HiveServer2 完全一致,因此我们部署好 Spark Thrift Server 后,可以直接使用hive 的 beeline 访问Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代HiveServer2,因此它依旧可以和 Hive Metastore 进行交互,获取到hive 的元数据。

如果想连接Thrift Server,需要通过以下几个步骤:

  • Spark 要接管 Hive 需要把hive-site.xml 拷贝到conf/目录下
  • 把 Mysql 的驱动 copy 到 jars/目录下
  • 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  • 启动Thrift Server sbin/start-thriftserver.sh
  • 使用 beeline 连接 Thrift Server

bin/beeline -u jdbc:hive2:// node1 :10000 -n root

5)代码操作 Hive

1)导入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>

2)将hive-site.xml 文件拷贝到项目的 resources 目录中,案列如下:

//创建 SparkSessionval spark: SparkSession = SparkSession.builder().enableHiveSupport().master("local[*]").appName("sql").getOrCreate()

注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址 : config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")

若出现以下情况

可以代码 最前面 增加如下代码解决:

System.setProperty("HADOOP_USER_NAME", "root")

此处的 root 改为自己的 hadoop 用户名称

来源:https://blog.csdn.net/m0_46782746/article/details/126769601