sql文件(sql文件怎么导出)
一、文件加载
1. spark.read.load
是加载数据的通用方法, 默认 加载和保存的是 parquet 格式文件
read可读格式
2. spark.read.format("…")[.option("…")].load("…")
- format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
- load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
- option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 我们前面都是使用read API 先把文件加载到 DataFrame 然后再查询
也可以直接在文件上进行查询: 文件格式 .` 文件路径 `
spark.sql("select * from json.`文件路径`").show
二、保存数据
1.df.write.save 保存数据的通用方法
保存不同格式的数据,可以对不同的数据格式进行设定
2.df.write.format("…")[.option("…")].save("…")
- format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
- save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
- option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode() 方法来设置。有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
其中SaveMode 是一个枚举类,其中的常量包括:
Scala/Java | Any Language | Meaning |
SaveMode.ErrorIfExists(default) | "error"(default) | 如果文件已经存在则抛出异常 |
SaveMode.Append | "append" | 如果文件已经存在则追加 |
SaveMode.Overwrite | "overwrite" | 如果文件已经存在则覆盖 |
SaveMode.Ignore | "ignore" | 如果文件已经存在则忽略 |
例如:
df.write.mode("append").json("output")
三、特殊数据源的加载和保存
1.MySQL
1)加载数据(需要导入MySQL连接依赖)
导入依赖
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>
案例实操:
val conf: SparkConf = newSparkConf().setMaster("local[*]").setAppName("SparkSQL")//创建 SparkSession 对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._//方式 1:通用的 load 方法读取spark.read.format("jdbc").option("url", "jdbc:mysql://node1:3306/sparkSql").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "123123").option("dbtable", "user").load().show//方式 2:通用的 load 方法读取 参数另一种形式spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://node1:3306/sparkSql?user=root&password= 123123","dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show//方式 3:使用 jdbc 方法读取val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123123")val df: DataFrame = spark.read.jdbc("jdbc:mysql://node1:3306/sparkSql", "user", props)df.show//释放资源spark.stop()
2)保存数据
case class User2(name: String, age: Long)val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")//创建 SparkSession 对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30)))val ds: Dataset[User2] = rdd.toDS//方式 1:通用的方式 format 指定写出类型ds.write.format("jdbc").option("url", "jdbc:mysql://node1:3306/sparkSql").option("user", "root").option("password", "123123").option("dbtable", "user").mode(SaveMode.Append).save()//方式 2:通过 jdbc 方法val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123123")ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://node1:3306/sparkSql", "user", props)//释放资源spark.stop()
2.hive
1)内嵌hive
如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可(使用 spark.sql() 方式).
Hive 的元数据存储在 derby 中, 默认仓库地址: $SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").showscala> spark.sql("create table aa(id int)")scala> spark.sql("show tables").show//向表加载本地数据scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")scala> spark.sql("select * from aa").show
2)外部的 HIVE
连接外部hive步骤:
- Spark 要接管 Hive 需要把hive-site.xml 拷贝到conf/目录下
- 把 Mysql 的驱动 copy 到 jars/目录下
- 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
- 重启 spark-shell
连接后和内嵌hive操作相同
3)运行 Spark SQL CLI
Spark SQL CLI 可以很方便的在本地运行Hive 元数据服务以及从命令行执行查询任务。在Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似一Hive 窗口
bin/spark-sql
4)运行Spark beeline
Spark Thrift Server 是Spark 社区基于HiveServer2 实现的一个Thrift 服务。旨在无缝兼容HiveServer2。因为 Spark Thrift Server 的接口和协议都和HiveServer2 完全一致,因此我们部署好 Spark Thrift Server 后,可以直接使用hive 的 beeline 访问Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代HiveServer2,因此它依旧可以和 Hive Metastore 进行交互,获取到hive 的元数据。
如果想连接Thrift Server,需要通过以下几个步骤:
- Spark 要接管 Hive 需要把hive-site.xml 拷贝到conf/目录下
- 把 Mysql 的驱动 copy 到 jars/目录下
- 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
- 启动Thrift Server sbin/start-thriftserver.sh
- 使用 beeline 连接 Thrift Server
bin/beeline -u jdbc:hive2:// node1 :10000 -n root
5)代码操作 Hive
1)导入依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>
2)将hive-site.xml 文件拷贝到项目的 resources 目录中,案列如下:
//创建 SparkSessionval spark: SparkSession = SparkSession.builder().enableHiveSupport().master("local[*]").appName("sql").getOrCreate()
注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址 : config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
若出现以下情况
可以代码 最前面 增加如下代码解决:
System.setProperty("HADOOP_USER_NAME", "root")
此处的 root 改为自己的 hadoop 用户名称
来源:https://blog.csdn.net/m0_46782746/article/details/126769601
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 931614094@qq.com 举报,一经查实,本站将立刻删除。