SparkSQL实战笔记(1)---- 部署、项目准备

1. 关于MapReduce的问题

  • MapReduce的槽点一

    • 需求:统计单词出现的个数(词频统计)

      • file中每个单词出现的次数

        hello,hello,hello
        world,world
        pk
        
    1. 读取file中每一行的数据
    2. 按照分隔符把每一行的内容进行拆分
    3. 按照相同的key分发到同一个任务上去进行累加的操作
    • 这是一个简单的不能再简单的一个需求,我们需要开发很多的代码

      1. 自定义Mapper
      2. 自定义Reducer
      3. 通过Driver把Mapper和Reducer串起来
      4. 打包,上传到集群上去
      5. 在集群上提交我们的wc程序
    • 一句话:就是会花费非常多的时间在非业务逻辑改动的工作上

  • MapReduce吐槽点二

    Input => MapReduce ==> Output ==> MapReduce ==> Output
    
  • 回顾下MapReduce执行流程:

    • MapTask或者ReduceTask都是进程级别
    • 第一个MR的输出要先落地,然后第二个MR把第一个MR的输出当做输入
    • 中间过程的数据是要落地

2. Spark

  1. 特性

    1. Speed:

      • both batch and streaming data

      • 批流一体 Spark Flink

    2. Ease of Use

      • high-level operators
    3. Generality

      • stack 栈 生态
    4. Runs Everywhere

      • It can access diverse data sources
      • YARN/Local/Standalone Spark应用程序的代码需要改动吗?
      • --master来指定你的Spark应用程序将要运行在什么模式下

3. 部署问题

3.1 JDK部署

  • 下载:https://www.oracle.com/index.html

  • 服务器端:

    • 下载linux版本的jdk

    • 解压:tar -zxvf jdk-8u91-linux-x64.tar.gz -C ~/app

    • 配置环境变量: ~/.bash_profile

      export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
      export PATH=$JAVA_HOME/bin:$PATH
      
    • 使环境变量生效:source ~/.bash_profile

  • 客户端:Win/Mac/Linux

    • Mac/Linux:就和服务器端安装方法一致

3.2 MavenIDEA部署

  1. Maven:IDEA+Maven来管理应用程序

    • 为什么你开发的时候不直接拷贝jar包呢?
    • 在maven中的pom.xml中添加我们所需要的dependency就行
  2. 官网:maven.apache.org

    • wget http://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.1/binaries/apache-maven-3.6.1-bin.tar.gz

    • 解压:tar -zxvf apache-maven-3.6.1-bin.tar.gz -C ~/app/

    • 配置环境变量:~/.bash_profile

      export MAVEN_HOME=/home/hadoop/app/apache-maven-3.6.1
      export PATH=$MAVEN_HOME/bin:$PATH
      
    • 使环境变量生效:source ~/.bash_profile

    • 服务器端:你是需要进行使用maven来编译我们的spark

    • 客户端:Win/Mac/Linux

    • 我们开发应用程序是在本地/本机,IDEA+Maven,所以本地也是需要安装maven的

    • 本地Win/Mac/Linux的maven安装方式和服务器端是一模一样的

    • 如果你是win用户,一定要注意: $MAVEN_HOME/conf/setting.xml

      <!-- localRepository
      	   | The path to the local repository maven will use to store artifacts.
      	   |
      	   | Default: ${user.home}/.m2/repository
      	  <localRepository>/path/to/local/repo</localRepository>
      	  -->
      
    • Win用户,默认是在C盘,所以建议大家更改Maven本地仓库的路径

  3. IDEA官网:http://www.jetbrains.com/

3.3 Hadoop部署

3.3.1 使用CDH cdh5.15.1

  • 下载地址:https://archive.cloudera.com/cdh5/cdh/5/

  • Hadoop:wget https://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.15.1.tar.gz

  • 解压:tar -zxvf hadoop-2.6.0-cdh5.15.1.tar.gz -C ~/app/

  • 修改hadoop-env.sh

    export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
    
  • 修改core-site.xml

    <property>
    	<name>fs.default.name</name>
    	<value>hdfs://hadoop000:8020</value>
    </property>
    
  • 修改hdfs-site.xml

    <property>
    	<name>dfs.datanode.data.dir</name>
    	<value>/home/hadoop/tmp/dfs/data</value>
    </property>
    
    <property>
    	<name>dfs.replication</name>
    	<value>1</value>
    </property>
    
    <property>
    	<name>dfs.permissions</name>
    	<value>false</value>
    </property>
    
  • 修改yarn-site.xml

    <property>
    	<name>yarn.nodemanager.aux-services</name>
    	<value>mapreduce_shuffle</value>
    </property>
    
  • 修改mapred-site.xml

    <property>
    	<name>mapreduce.framework.name</name>
    	<value>yarn</value>
    </property>
    
  • 修改slaves(可选)

    • hadoop000
  • 配置系统环境变量

    export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.15.1
    export PATH=$HADOOP_HOME/bin:$PATH
    
  • 配置SSH的免密码登录

  • 在启动HDFS之前,一定要先对HDFS对格式化

    • 切记:格式化只会一次,因为一旦格式化了,那么HDFS上的数据就没了
    • 格式化命令:hdfs namenode -format
  • 启动HDFS

    1. 逐个进程启动/停止

      $ hadoop-daemon.sh start/stop namenode
      $ hadoop-daemon.sh start/stop datanode
      
      • jps验证
      • 如果发现有缺失的进程,那么就找缺失进程的名称对应的日志(log而不是out)
    2. 一键式启动HDFS

      $ start-dfs.sh
      $ stop-dfs.sh
      

3.4 Hive部署

  1. Hadoop:wget https://archive.cloudera.com/cdh5/cdh/5/hive-1.1.0-cdh5.15.1.tar.gz

  2. 系统环境变量

    export HIVE_HOME=/home/hadoop/app/hive-1.1.0-cdh5.15.1
    export PATH=$HIVE_HOME/bin:$PATH
    
  3. 需要安装MySQLyum

    • 需要拷贝MySQL的驱动$HIVE_HOME/lib 版本5.x

    • 修改$HIVE_HOME/conf/hive-site.xml文件

      <?xml version="1.0"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      
      <configuration>
      <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost:3306/pk?createDatabaseIfNotExist=true</value>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
      </property>
      
      <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
      </property>
      
      <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>root</value>
      </property>
      
      </configuration>
      
  4. Hive: HDFS上的数据 + MySQL中元数据信息

4. Spark运行模式

  1. local:本地运行,在开发代码的时候,我们使用该模式进行测试是非常方便的
  2. standalone:Hadoop部署多个节点的,同理Spark可以部署多个节点 用的不多
  3. YARN:将Spark作业提交到Hadoop(YARN)集群中运行,Spark仅仅只是一个客户端而已 最多的用法
  4. Mesos:不常用
  5. K8S:2.3版本才正式稍微稳定 是未来比较好的一个方向
  6. 补充:运行模式和代码没有任何关系,同一份代码可以不做修改运行在不同的运行模式下

5. 构建应用

  1. 使用IDEA+Maven来构建我们的Spark应用

  2. 在命令行中运行一下MAVEN命令

    mvn archetype:generate -DarchetypeGroupId=net.alchim31.maven \
    -DarchetypeArtifactId=scala-archetype-simple \
    -DremoteRepositories=http://scala-tools.org/repo-releases \
    -DarchetypeVersion=1.5 \
    -DgroupId=com.imooc.bigdata \
    -DartifactId=sparksql-train \
    -Dversion=1.0
    
  3. 打开IDEA,把这个项目中的pom.xml打开即可

  4. 同时,在pom.xml中添加一下相关配置

    pom.xml
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.tools.version>2.11</scala.tools.version>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.3</spark.version>
        <hadoop.version>2.6.0-cdh5.15.1</hadoop.version>
    </properties>	
    
    添加CDH的仓库
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
        </repository>
    </repositories>
    
    添加Spark SQL和Hadoop Client的依赖
    <!--Spark SQL依赖-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    
    <!-- Hadoop相关依赖-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    

6. 实战:词频统计案例

  1. 输入:文件

    • 需求:统计出文件中每个单词出现的次数
      1. 读每一行数据
      2. 按照分隔符把每一行的数据拆成单词
      3. 每个单词赋上次数为1
      4. 按照单词进行分发,然后统计单词出现的次数
      5. 把结果输出到文件中
  2. 输出:文件

  3. 使用local模式运行spark-shell

    ./spark-shell --master local
    
    • 打包我们的应用程序,让其运行在local模式下

    • 如何运行jar包呢?

    ./spark-submit \
    --class  com.imooc.bigdata.chapter02.SparkWordCountAppV2 \
    --master local \
    /home/hadoop/lib/sparksql-train-1.0.jar \
    file:///home/hadoop/data/wc.data file:///home/hadoop/data/out 
    
    • 使用local模式的话,你只需要把spark的安装包解压开,什么都不用动,就能使用
  4. 如何提交Spark应用程序到YARN上执行

    ./spark-submit \
    --class  com.imooc.bigdata.chapter02.SparkWordCountAppV2 \
    --master yarn \
    --name SparkWordCountAppV2 \
    /home/hadoop/lib/sparksql-train-1.0.jar \
    hdfs://hadoop000:8020/pk/wc.data hdfs://hadoop000:8020/pk/out
    
  5. 要将Spark应用程序运行在YARN上,一定要配置HADOOP_CONF_DIR或者YARN_CONF_DIR

    指向$HADOOP_HOME/etc/conf

  6. local和YARN模式:重点掌握

  7. Standalone:了解

    • 多个机器,那么你每个机器都需要部署spark

    • 相关配置:

      $SPARK_HOME/conf/slaves
      	hadoop000
      SPARK_HOME/conf/spark-env.sh
      	SPARK_MASTER_HOST=hadoop000
      
    • 启动Spark集群

      $SPARK_HOME/sbin/start-all.sh
      jps: Master  Worker
      
    • spark提交作业

      ./spark-submit \
      --class  com.imooc.bigdata.chapter02.SparkWordCountAppV2 \
      --master spark://hadoop000:7077 \
      --name SparkWordCountAppV2 \
      /home/hadoop/lib/sparksql-train-1.0.jar \
      hdfs://hadoop000:8020/pk/wc.data hdfs://hadoop000:8020/pk/out2
      
    • 不管什么运行模式,代码不用改变,只需要在spark-submit脚本提交时

      通过--master xxx 来设置你的运行模式即可

7. 实战代码

  • Scala版本
package com.zth.bigdata.examples
import org.apache.spark.{SparkConf, SparkContext}
/**
 * Author: 3zZ.
 * Date: 2020/1/13 3:14 下午
 */
object SparkWordCountApp {
  def main(args: Array[String]): Unit = {
    /**
     * master: 运行模式 local
     */
    val sparkConf = new SparkConf().setMaster("local").setAppName("SparkWordCountApp")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.textFile("/Users/3zz/Code/Spark/spark-sql-train/data/input.txt")
    /**
     * 按照单词个数进行降序排列
     */
    rdd.flatMap(_.split(",")).map((_, 1))
      .reduceByKey(_ + _).map(x => (x._2, x._1))
      .sortByKey(false).map(x =>(x._2,x._1))
      .collect().foreach(println)
    //      .saveAsTextFile("/Users/3zz/Code/Spark/spark-sql-train/data/out").
    sc.stop()
  }
}