没长正的技术专栏 勤动手、多思考

SparkRDD实战

2021-04-29

阅读:

2021-04-29

SparkRDD

1.基本概念

1.1 什么是 RDD?

RDD(Resilient Distributed Datasets):一个弹性分布式数据集, Spark中的基本抽象。

代表一个不变(只读)的、可以并行操作的元素的分区集合。

Spark中原生的RDD支持从以下三种方式创建:从scala集合中创建、从文件系统中创建、现有RDD的transform操作创建。

1.2. Spark RDD 示例

    #1. 进入 tmp目录:
    cd /home/meizhangzheng/hadoop/tmp

    #2. 创建多级目录
    hadoop fs -mkdir -p /data/tmp

    #3. 创建源数据文件
    hadoop fs -touchz /data/tmp/word.txt

    #4.查看目录
    hdfs dfs -ls /data/tmp

    #5. 查看文件内容
    hdfs dfs -cat /data/tmp/word.txt

    #6. 将本地文件复制到hdfs上命令
    hadoop fs -put word.txt /data/tmp/word.txt

    bout yun
    hello word
    hello spark

    #注: 如果文件存在则覆盖,添加 -f
    hadoop fs -put -f  word.txt /data/tmp/word.txt

    #7.开始统计单词

1.2.1 统计单词示例

    #1. 启动
    spark shell

    #2. 系统 把SparkContext 赋值为sc变量
    scala> sc
    res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2981064d


    #3. 加载文件赋值为rawRdd变量
    scala> val rawRdd = sc.textFile("/data/tmp/word.txt")
    rawRdd: org.apache.spark.rdd.RDD[String] = /data/tmp/words.txt MapPartitionsRDD[1] at textFile at <console>:24


    #4. first算账在  查看第一行数据
    scala> rawRdd.first
    res2: String = bout yun

    #5. collect算子   rawRdd :查看 内容  ,如果内容比较大,则不能使用collect ,搞崩内存   
    scala> rawRdd.collect
    res3: Array[String] = Array(bout yun, hello word, hello spark)

    #6. 查看血缘关系  rawRdd.toDebugString
    res6: String =
    (2) /data/tmp/word.txt MapPartitionsRDD[1] at textFile at <console>:24 []
    |  /data/tmp/word.txt HadoopRDD[0] at textFile at <console>:24 []

    #7. 转换 transaction
    scala> val words = rawRdd.flatMap(line => line.split("\\s+"))
    words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:25

    scala> words.toDebugString
    res8: String =
    (2) MapPartitionsRDD[2] at flatMap at <console>:25 []
    |  /data/tmp/word.txt MapPartitionsRDD[1] at textFile at <console>:24 []
    |  /data/tmp/word.txt HadoopRDD[0] at textFile at <console>:24 []

    #8. 拆分
    scala> words.collect
    res9: Array[String] = Array(bout, yun, hello, word, hello, spark)               

    scala> val words_one = words.map(word=>(word,1))
    words_one: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25

    scala> words_one.collect
    res10: Array[(String, Int)] = Array((bout,1), (yun,1), (hello,1), (word,1), (hello,1), (spark,1))

    #9. 计数  (Action 操作)
    scala> words_one.reduceByKey(_+_).collect
    res12: Array[(String, Int)] = Array((yun,1), (bout,1), (word,1), (hello,2), (spark,1))


    #10. 血缘关系
    scala> words_one.reduceByKey(_+_).toDebugString
    res13: String =
    (2) ShuffledRDD[6] at reduceByKey at <console>:26 []
    +-(2) MapPartitionsRDD[3] at map at <console>:25 []
    |  MapPartitionsRDD[2] at flatMap at <console>:25 []
    |  /data/tmp/word.txt MapPartitionsRDD[1] at textFile at <console>:24 []
    |  /data/tmp/word.txt HadoopRDD[0] at textFile at <console>:24 []

    

1.3 Spark DataFrame SQL 编程

1.3.1 什么是 Spark DataFrame & SQL?

Spark DataFrame 是一个带有列名称的分布式数据集,类似于关系型数据库中的一张表,可以通过结构化的数据文件,Hive中的表,外部数据库以及已经存在的RDD得到。

Spark SQL 是使用 SQL 或 HiveQL 语法编写 SQL 语句,来执行计算任务。

1.3.2 Spark DataFrame & SQL 引入动机?

1)Spark DataFrame & SQL 执行效率相比 RDD 更高

2)Spark DataFrame & SQL 代码更简洁

3)Spark DataFrame & SQL 带有自动优化程序引擎

Similar Posts

欢迎拍砖,多多交流,转载请注明出处:[没长正的技术专栏](http://blog.meizhangzheng.com) 如涉及侵权问题,请发送邮件到xsj34567@163.com,如情况属实本人将会尽快删除。


上一篇 Flume集群

Comments