SparkSQL实战笔记(1)---- 部署、项目准备
1. 关于MapReduce的问题
-
MapReduce的槽点一
-
需求:统计单词出现的个数(词频统计)
-
file中每个单词出现的次数
hello,hello,hello world,world pk
-
- 读取file中每一行的数据
- 按照分隔符把每一行的内容进行拆分
- 按照相同的key分发到同一个任务上去进行累加的操作
-
这是一个简单的不能再简单的一个需求,我们需要开发很多的代码
- 自定义Mapper
- 自定义Reducer
- 通过Driver把Mapper和Reducer串起来
- 打包,上传到集群上去
- 在集群上提交我们的wc程序
-
一句话:就是会花费非常多的时间在非业务逻辑改动的工作上
-
-
MapReduce吐槽点二
Input => MapReduce ==> Output ==> MapReduce ==> Output
-
回顾下MapReduce执行流程:
- MapTask或者ReduceTask都是进程级别
- 第一个MR的输出要先落地,然后第二个MR把第一个MR的输出当做输入
- 中间过程的数据是要落地
2. Spark
-
特性
-
Speed:
-
both batch and streaming data
-
批流一体 Spark Flink
-
-
Ease of Use
- high-level operators
-
Generality
- stack 栈 生态
-
Runs Everywhere
- It can access diverse data sources
- YARN/Local/Standalone Spark应用程序的代码需要改动吗?
- --master来指定你的Spark应用程序将要运行在什么模式下
-
3. 部署问题
3.1 JDK
部署
-
下载:https://www.oracle.com/index.html
-
服务器端:
-
下载linux版本的jdk
-
解压:
tar -zxvf jdk-8u91-linux-x64.tar.gz -C ~/app
-
配置环境变量:
~/.bash_profile
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91 export PATH=$JAVA_HOME/bin:$PATH
-
使环境变量生效:
source ~/.bash_profile
-
-
客户端:Win/Mac/Linux
- Mac/Linux:就和服务器端安装方法一致
3.2 Maven
和IDEA
部署
-
Maven:IDEA+Maven来管理应用程序
- 为什么你开发的时候不直接拷贝jar包呢?
- 在maven中的pom.xml中添加我们所需要的dependency就行
-
官网:maven.apache.org
-
wget http://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.1/binaries/apache-maven-3.6.1-bin.tar.gz
-
解压:
tar -zxvf apache-maven-3.6.1-bin.tar.gz -C ~/app/
-
配置环境变量:
~/.bash_profile
export MAVEN_HOME=/home/hadoop/app/apache-maven-3.6.1 export PATH=$MAVEN_HOME/bin:$PATH
-
使环境变量生效:
source ~/.bash_profile
-
服务器端:你是需要进行使用maven来编译我们的spark
-
客户端:Win/Mac/Linux
-
我们开发应用程序是在本地/本机,IDEA+Maven,所以本地也是需要安装maven的
-
本地Win/Mac/Linux的maven安装方式和服务器端是一模一样的
-
如果你是win用户,一定要注意: $MAVEN_HOME/conf/setting.xml
<!-- localRepository | The path to the local repository maven will use to store artifacts. | | Default: ${user.home}/.m2/repository <localRepository>/path/to/local/repo</localRepository> -->
-
Win用户,默认是在C盘,所以建议大家更改Maven本地仓库的路径
-
-
IDEA官网:http://www.jetbrains.com/
3.3 Hadoop
部署
3.3.1 使用CDH
cdh5.15.1
-
下载地址:https://archive.cloudera.com/cdh5/cdh/5/
-
Hadoop:
wget https://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.15.1.tar.gz
-
解压:
tar -zxvf hadoop-2.6.0-cdh5.15.1.tar.gz -C ~/app/
-
修改
hadoop-env.sh
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
-
修改core-site.xml
<property> <name>fs.default.name</name> <value>hdfs://hadoop000:8020</value> </property>
-
修改hdfs-site.xml
<property> <name>dfs.datanode.data.dir</name> <value>/home/hadoop/tmp/dfs/data</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property>
-
修改yarn-site.xml
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property>
-
修改mapred-site.xml
<property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>
-
修改slaves(可选)
- hadoop000
-
配置系统环境变量
export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.15.1 export PATH=$HADOOP_HOME/bin:$PATH
-
配置SSH的免密码登录
-
在启动HDFS之前,一定要先对HDFS对格式化
- 切记:格式化只会一次,因为一旦格式化了,那么HDFS上的数据就没了
- 格式化命令:
hdfs namenode -format
-
启动HDFS
-
逐个进程启动/停止
$ hadoop-daemon.sh start/stop namenode $ hadoop-daemon.sh start/stop datanode
- jps验证
- 如果发现有缺失的进程,那么就找缺失进程的名称对应的日志(log而不是out)
-
一键式启动HDFS
$ start-dfs.sh $ stop-dfs.sh
-
3.4 Hive部署
-
Hadoop:wget https://archive.cloudera.com/cdh5/cdh/5/hive-1.1.0-cdh5.15.1.tar.gz
-
系统环境变量
export HIVE_HOME=/home/hadoop/app/hive-1.1.0-cdh5.15.1 export PATH=$HIVE_HOME/bin:$PATH
-
需要安装
MySQL
与yum
-
需要拷贝MySQL的驱动$HIVE_HOME/lib 版本5.x
-
修改
$HIVE_HOME/conf/hive-site.xml
文件<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/pk?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> </property> </configuration>
-
-
Hive: HDFS上的数据 + MySQL中元数据信息
4. Spark运行模式
- local:本地运行,在开发代码的时候,我们使用该模式进行测试是非常方便的
- standalone:Hadoop部署多个节点的,同理Spark可以部署多个节点 用的不多
- YARN:将Spark作业提交到Hadoop(YARN)集群中运行,Spark仅仅只是一个客户端而已 最多的用法
- Mesos:不常用
- K8S:2.3版本才正式稍微稳定 是未来比较好的一个方向
- 补充:运行模式和代码没有任何关系,同一份代码可以不做修改运行在不同的运行模式下
5. 构建应用
-
使用
IDEA
+Maven
来构建我们的Spark应用 -
在命令行中运行一下
MAVEN
命令mvn archetype:generate -DarchetypeGroupId=net.alchim31.maven \ -DarchetypeArtifactId=scala-archetype-simple \ -DremoteRepositories=http://scala-tools.org/repo-releases \ -DarchetypeVersion=1.5 \ -DgroupId=com.imooc.bigdata \ -DartifactId=sparksql-train \ -Dversion=1.0
-
打开IDEA,把这个项目中的pom.xml打开即可
-
同时,在
pom.xml
中添加一下相关配置pom.xml <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.tools.version>2.11</scala.tools.version> <scala.version>2.11.8</scala.version> <spark.version>2.4.3</spark.version> <hadoop.version>2.6.0-cdh5.15.1</hadoop.version> </properties> 添加CDH的仓库 <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> </repositories> 添加Spark SQL和Hadoop Client的依赖 <!--Spark SQL依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop相关依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>
6. 实战:词频统计案例
-
输入:文件
- 需求:统计出文件中每个单词出现的次数
- 读每一行数据
- 按照分隔符把每一行的数据拆成单词
- 每个单词赋上次数为1
- 按照单词进行分发,然后统计单词出现的次数
- 把结果输出到文件中
- 需求:统计出文件中每个单词出现的次数
-
输出:文件
-
使用local模式运行spark-shell
./spark-shell --master local
-
打包我们的应用程序,让其运行在local模式下
-
如何运行jar包呢?
./spark-submit \ --class com.imooc.bigdata.chapter02.SparkWordCountAppV2 \ --master local \ /home/hadoop/lib/sparksql-train-1.0.jar \ file:///home/hadoop/data/wc.data file:///home/hadoop/data/out
- 使用local模式的话,你只需要把spark的安装包解压开,什么都不用动,就能使用
-
-
如何提交Spark应用程序到YARN上执行
./spark-submit \ --class com.imooc.bigdata.chapter02.SparkWordCountAppV2 \ --master yarn \ --name SparkWordCountAppV2 \ /home/hadoop/lib/sparksql-train-1.0.jar \ hdfs://hadoop000:8020/pk/wc.data hdfs://hadoop000:8020/pk/out
-
要将Spark应用程序运行在YARN上,一定要配置
HADOOP_CONF_DIR
或者YARN_CONF_DIR
指向
$HADOOP_HOME/etc/conf
-
local和YARN模式:重点掌握
-
Standalone:了解
-
多个机器,那么你每个机器都需要部署spark
-
相关配置:
$SPARK_HOME/conf/slaves hadoop000 SPARK_HOME/conf/spark-env.sh SPARK_MASTER_HOST=hadoop000
-
启动Spark集群
$SPARK_HOME/sbin/start-all.sh jps: Master Worker
-
spark提交作业
./spark-submit \ --class com.imooc.bigdata.chapter02.SparkWordCountAppV2 \ --master spark://hadoop000:7077 \ --name SparkWordCountAppV2 \ /home/hadoop/lib/sparksql-train-1.0.jar \ hdfs://hadoop000:8020/pk/wc.data hdfs://hadoop000:8020/pk/out2
-
不管什么运行模式,代码不用改变,只需要在
spark-submit
脚本提交时通过
--master xxx
来设置你的运行模式即可
-
7. 实战代码
Scala
版本
package com.zth.bigdata.examples
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: 3zZ.
* Date: 2020/1/13 3:14 下午
*/
object SparkWordCountApp {
def main(args: Array[String]): Unit = {
/**
* master: 运行模式 local
*/
val sparkConf = new SparkConf().setMaster("local").setAppName("SparkWordCountApp")
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("/Users/3zz/Code/Spark/spark-sql-train/data/input.txt")
/**
* 按照单词个数进行降序排列
*/
rdd.flatMap(_.split(",")).map((_, 1))
.reduceByKey(_ + _).map(x => (x._2, x._1))
.sortByKey(false).map(x =>(x._2,x._1))
.collect().foreach(println)
// .saveAsTextFile("/Users/3zz/Code/Spark/spark-sql-train/data/out").
sc.stop()
}
}