• <optgroup id="ccyim"><button id="ccyim"></button></optgroup>
  • 教育行業A股IPO第一股(股票代碼 003032)

    全國咨詢/投訴熱線:400-618-4000

    RDD為什么要進行數據持久化?持久化詳細操作步驟示例

    更新時間:2020年12月22日16時49分 來源:傳智教育 瀏覽次數:

    好口碑IT培訓

      在Spark中,RDD是采用惰性求值,即每次調用行動算子操作,都會從頭開始計算。然而,每次調用行動算子操作,都會觸發一次從頭開始的計算,這對于迭代計算來說,代價是很大的,因為迭代計算經常需要多次重復的使用同一組數據集,所以,為了避免重復計算的開銷,可以讓Spark對數據集進行持久化。

      通常情況下,一個RDD是由多個分區組成的,RDD中的數據分布在多個節點中,因此,當持久化某個RDD時,每一個節點都將把計算分區的結果保存在內存中,若對該RDD或衍生出的RDD進行其他行動算子操作時,則不需要重新計算,直接去取各個分區保存數據即可,這使得后續的行動算子操作速度更快(通常超過10倍),并且緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。

      RDD的持久化操作有兩種方法,分別是cache()方法和persist()方法。每一個持久化的RDD都可以使用不同的存儲級別存儲,從而允許持久化數據集在硬盤或者內存作為序列化的Java對象,甚至可以跨節點復制。

      persist()方法的存儲級別是通過StorageLevel對象(Scala、Java、Python)設置的。

      cache()方法的存儲級別是使用默認的存儲級別(即StorageLevel.MEMORY_ONLY(將反序列化的對象存入內存))。接下來,通過一張表介紹一下持久化RDD的存儲級別,如表1所示。

      表1 持久化RDD的存儲級別

      在表1中,列舉了持久化RDD的存儲級別,我們可以在RDD進行第一次算子操作時,根據自己的需求選擇對應的存儲級別。

      為了大家更好地理解,接下來,通過代碼演示如何使用persist()方法和cache()方法對RDD進行持久化。

      1.使用persist()方法對RDD進行持久化

      定義一個列表list,通過該列表創建一個RDD,然后通過persist持久化操作和算子操作統計RDD中的元素個數以及打印輸出RDD中的所有元素。具體代碼如下:

       scala> import org.apache.spark.storage.StorageLevel
       import org.apache.spark.storage.StorageLevel
       scala> val list = List("hadoop","spark","hive")
       list: List[String] = List(hadoop, spark, hive)
       scala> val listRDD = sc.parallelize(list)
       listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
                             parallelize at :27
       scala> listRDD.persist(StorageLevel.DISK_ONLY)
       res1: listRDD.type = ParallelCollectionRDD[0] at parallelize at :27
      scala> println(listRDD.count())
      3
      scala> println(listRDD.collect().mkString(","))
      hadoop,spark,hive

      上述代碼中,第1行代碼導入StorageLevel對象的包;第3行代碼定義了一個列表list;第5行代碼執行sc.parallelize(list)操作,創建了一個RDD,即listRDD;第8行代碼添加了persist()方法,用于持久化RDD,減少I/O操作,提高計算效率;第10行代碼執行listRDD.count()行動算子操作,將統計listRDD中元素的個數;第12行代碼執行listRDD.collect()行動算子操作和mkString(“,”)操作,將listRDD中的所有元素進行打印輸出,并且是以逗號為分隔符。

      需要注意的是,當程序執行到第8行代碼時,并不會持久化listRDD,因為listRDD還沒有被真正計算;當執行第10行代碼時,listRDD才會進行第一次的行動算子操作,觸發真正的從頭到尾的計算,這時listRDD.persist()方法才會被真正的執行,把listRDD持久化到磁盤中;當執行到第12行代碼時,進行第二次的行動算子操作,但不觸發從頭到尾的計算,只需使用已經進行持久化的listRDD來進行計算。

      2.使用cache()方法對RDD進行持久化

      定義一個列表list,通過該列表創建一個RDD,然后通過cache持久化操作和算子操作統計RDD中的元素個數以及打印輸出rdd中的所有元素。具體代碼如下:

       scala> val list= List("hadoop","spark","hive")
       list: List[String] = List(hadoop, spark, hive)
       scala> val listRDD= sc.parallelize(list)
       listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
                             parallelize at :26
       scala> listRDD.cache()
       res2: listRDD.type = ParallelCollectionRDD[1] at parallelize at :26
       scala> println(listRDD.count())
       3
       scala> println(listRDD.collect().mkString(","))
       hadoop,spark,hive

      上述代碼中,第6行代碼對listRDD進行持久化操作,即添加cache()方法,用于持久化RDD,減少I/O操作,提高計算效率。然而,使用cache()方法進行持久化操作,底層是調用了persist(MEMORY_ONLY)方法,用來對RDD進行持久化。當程序當執行到第6行代碼時,并不會持久化listRDD,因為listRDD還沒有被真正計算;當程序執行第8行代碼時,listRDD才會進行第一次的行動算子操作,觸發真正的從頭到尾的計算,這時listRDD.cache()方法才會被真正的執行,把listRDD持久化到內存中;當程序執行到第10行代碼時,進行第二次的行動算子操作,但不觸發從頭到尾的計算,只需使用已經持久化的listRDD來進行計算。

    猜你喜歡:

    如何遠程登錄Hadoop虛擬機和開啟SSH服務]

    IDEA工具開發WordCount單詞計數程序的步驟有哪些

    怎樣使用Linux和HDFS創建RDD?

    傳智大數據培訓課程

    男女乱婬真视频,波多野结衣中文字幕在观线看,翘着光屁股趴在办公室,欧美做真爱免费A片