• <optgroup id="ccyim"><button id="ccyim"></button></optgroup>
  • 教育行業A股IPO第一股(股票代碼 003032)

    全國咨詢/投訴熱線:400-618-4000

    RDD是如何操作數據轉換的?RDD轉換算子API示例

    更新時間:2020年12月21日18時35分 來源:傳智教育 瀏覽次數:

    好口碑IT培訓

      RDD處理過程中的“轉換”操作主要用于根據已有RDD創建新的RDD,每一次通過Transformation算子計算后都會返回一個新RDD,供給下一個轉換算子使用。下面,通過一張表來列舉一些常用轉換算子操作的API,如表1所示。

      表1 常用的轉換算子API

    1608546329889_21.png

      下面,我們通過結合具體的示例對這些轉換算子API進行詳細講解。

      ·filter(func)

      filter(func)操作會篩選出滿足函數func的元素,并返回一個新的數據集。假設,有一個文件test.txt(內容如文件3-1),下面,通過一張圖來描述如何通過filter算子操作,篩選出包含單詞“spark”的元素,具體過程如圖1所示。

    1608545555238_22.jpg

       圖1 filter算子操作

      在圖1中,通過從test.txt文件中加載數據的方式創建RDD,然后通過filter操作篩選出滿足條件的元素,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:

      scala> val lines = sc.textFile("file:///export/data/test.txt")
      lines: org.apache.spark.rdd.RDD[String] = `[file:///export/data/test.txt]`(file:///\\export\data\test.txt)
                  MapPartitionsRDD[1] at textFile at :24
      scala> val linesWithSpark = lines.filter(line => line.contains("spark"))
      linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at
                                 filter at :25

      在上述代碼中,filter()輸入的參數line => line.contains(“spark”)是一個匿名函數,其含義是依次取出lines這個RDD中的每一個元素,對于當前取到的元素,把它賦值給匿名函數中的line變量。若line中包含“spark”單詞,就把這個元素加入到RDD(即linesWithSpark)中,否則就丟棄該元素。

      ·map(func)

      map(func)操作將每個元素傳遞到函數func中,并將結果返回為一個新的數據集。假設,有一個文件test.txt(內容如文件1),接下來,通過一張圖來描述如何通過map算子操作把文件內容拆分成一個個的單詞并封裝在數組對象中,具體過程如圖2所示。

    1608545566832_23.jpg

     圖2 map算子操作

      在圖2中,通過從test.txt文件中加載數據的方式創建RDD,然后通過map操作將文件的每一行內容都拆分成一個個的單詞元素,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:

      scala> val lines = sc.textFile("file:///export/data/test.txt")
      lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)
                  MapPartitionsRDD[4] at textFile at :24
      scala> val words = lines.map(line => line.split(" "))
      words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[13] at
                                 map at :25

      上述代碼中,lines.map(line => line.split(“ ”))含義是依次取出lines這個RDD中的每個元素,對于當前取到的元素,把它賦值給匿名函數中的line變量。由于line是一行文本,如“hadoop spark”,一行文本中包含多個單詞,且空格進行分隔,通過line.split(“ ”)匿名函數,將文本分成一個個的單詞,拆分后得到的單詞都被封裝到一個數組對象中,成為新的RDD(即words)的一個元素。

      ·flatMap(func)

      flatMap(func)與map(func)相似,但是每個輸入的元素都可以映射到0或者多個輸出的結果。有一個文件test.txt(內容如文件3-1),接下來,通過一張圖來描述如何通過flatMap算子操作,把文件內容拆分成一個個的單詞,具體過程如圖3所示。

    1608545586788_24.jpg

    圖3 flatMap算子操作

      在圖3中,通過從test.txt文件中加載數據的方式創建RDD,然后通過flatMap操作將文件的每一行內容都拆分成一個個的單詞元素,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:

     scala> val lines = sc.textFile("file:///export/data/test.txt")
      lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)
                  MapPartitionsRDD[5] at textFile at :24
      scala> val words = lines.flatMap(line => line.split(" "))
      words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at
                                 map at :25

      在上述代碼中,lines. flatMap(line => line.split(“ ”))等價于先執行lines.map(line => line.split(“ ”))操作(請參考map(func)操作),再執行flat()操作(即扁平化操作),把wordArray中的每個RDD都扁平成多個元素,被扁平后得到的元素構成一個新的RDD(即words)。

      groupByKey()

      groupByKey()主要用于(Key,Value)鍵值對的數據集,將具有相同Key的Value進行分組,會返回一個新的(Key,Iterable)形式的數據集。同樣以文件test.txt為例,接下來,通過一張圖來描述如何通過groupByKey算子操作,將文件內容中的所有單詞進行分組,具體過程如圖4示。

    1608545606730_25.jpg

    圖4 groupByKey算子操作

      在圖4中,通過groupByKey操作把(Key,Value)鍵值對類型的RDD,按單詞將單詞出現的次數進行分組,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:

    scala> val lines = sc.textFile("file:///export/data/test.txt")
      lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)             MapPartitionsRDD[6] at textFile at :24
      scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
      words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at                            map at :25
      scala> val groupWords=words.groupByKey()
       groupWords: org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[16]
                            at groupByKey at :25

      上述代碼中,words.groupByKey()操作執行后,RDD中所有的Key相同的Value都被合并到一起。例如,(“spark”,1)、(“spark”,1)、(“spark”,1)這三個鍵值對的Key都是“spark”,合并后得到新的鍵值對(“spark”,(1,1,1))。

      reduceByKey(func)

      reduceByKey()主要用于(Key,Value)鍵值對的數據集,返回的是一個新的(Key,Iterable)形式的數據集,該數據集是每個Key傳遞給函數func進行聚合運算后得到的結果。同樣以文件test.txt(內容如文件3-1),接下來,通過一張圖來描述如何通過reduceByKey算子操作統計單詞出現的次數,具體操作如圖5所示。

    1608545621121_26.jpg
     圖5 reduceByKey()算子操作

      在圖5中,通過reduceByKey操作把(Key,Value)鍵值對類型的RDD,按單詞Key將單詞出現的次數Value進行聚合,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:

     scala> val lines = sc.textFile("file:///export/data/test.txt")  lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt)
                  MapPartitionsRDD[7] at textFile at :24
      scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))  words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at                              map at :25
      scala> val reduceWords=words.reduceByKey((a,b)=>a+b)
      reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at
                             reduceByKey at :25

      上述代碼中,執行words.reduceByKey((a,b) => a + b)操作,共分為兩個步驟,分別是先執行reduceByKey()操作,將所有Key相同的Value值合并到一起,生成一個新的鍵值對(例如(“spark”,(1,1,1)));然后執行函數func的操作,即使用(a,b)=> a + b函數把(1,1,1)進行聚合求和,得到最終的結果,即(“spark”,3)。    

    猜你喜歡:

    怎樣使用Spark Shell來讀取HDFS文件

    HBase數據庫是怎樣存儲數據的?

    怎樣使用Linux和HDFS創建RDD?

    傳智大數據培訓課程

    男女乱婬真视频,波多野结衣中文字幕在观线看,翘着光屁股趴在办公室,欧美做真爱免费A片