全國(guó)咨詢(xún)/投訴熱線(xiàn):400-618-4000

首頁(yè)技術(shù)文章正文

RDD為什么要進(jìn)行數(shù)據(jù)持久化?它的操作方法有哪些?

更新時(shí)間:2020-12-22 來(lái)源:黑馬程序員 瀏覽量:

1577370495235_學(xué)IT就到黑馬程序員.gif

  在Spark中,RDD是采用惰性求值,即每次調(diào)用行動(dòng)算子操作,都會(huì)從頭開(kāi)始計(jì)算。然而,每次調(diào)用行動(dòng)算子操作,都會(huì)觸發(fā)一次從頭開(kāi)始的計(jì)算,這對(duì)于迭代計(jì)算來(lái)說(shuō),代價(jià)是很大的,因?yàn)榈?jì)算經(jīng)常需要多次重復(fù)的使用同一組數(shù)據(jù)集,所以,為了避免重復(fù)計(jì)算的開(kāi)銷(xiāo),可以讓Spark對(duì)數(shù)據(jù)集進(jìn)行持久化。

  通常情況下,一個(gè)RDD是由多個(gè)分區(qū)組成的,RDD中的數(shù)據(jù)分布在多個(gè)節(jié)點(diǎn)中,因此,當(dāng)持久化某個(gè)RDD時(shí),每一個(gè)節(jié)點(diǎn)都將把計(jì)算分區(qū)的結(jié)果保存在內(nèi)存中,若對(duì)該RDD或衍生出的RDD進(jìn)行其他行動(dòng)算子操作時(shí),則不需要重新計(jì)算,直接去取各個(gè)分區(qū)保存數(shù)據(jù)即可,這使得后續(xù)的行動(dòng)算子操作速度更快(通常超過(guò)10倍),并且緩存是Spark構(gòu)建迭代式算法和快速交互式查詢(xún)的關(guān)鍵。

  RDD的持久化操作有兩種方法,分別是cache()方法和persist()方法。每一個(gè)持久化的RDD都可以使用不同的存儲(chǔ)級(jí)別存儲(chǔ),從而允許持久化數(shù)據(jù)集在硬盤(pán)或者內(nèi)存作為序列化的Java對(duì)象,甚至可以跨節(jié)點(diǎn)復(fù)制。

  persist()方法的存儲(chǔ)級(jí)別是通過(guò)StorageLevel對(duì)象(Scala、Java、Python)設(shè)置的。

  cache()方法的存儲(chǔ)級(jí)別是使用默認(rèn)的存儲(chǔ)級(jí)別(即StorageLevel.MEMORY_ONLY(將反序列化的對(duì)象存入內(nèi)存))。接下來(lái),通過(guò)一張表介紹一下持久化RDD的存儲(chǔ)級(jí)別,如表1所示。

  表1 持久化RDD的存儲(chǔ)級(jí)別
1608627118074_21.png

  在表1中,列舉了持久化RDD的存儲(chǔ)級(jí)別,我們可以在RDD進(jìn)行第一次算子操作時(shí),根據(jù)自己的需求選擇對(duì)應(yīng)的存儲(chǔ)級(jí)別。

  為了大家更好地理解,接下來(lái),通過(guò)代碼演示如何使用persist()方法和cache()方法對(duì)RDD進(jìn)行持久化。

  1.使用persist()方法對(duì)RDD進(jìn)行持久化

  定義一個(gè)列表list,通過(guò)該列表創(chuàng)建一個(gè)RDD,然后通過(guò)persist持久化操作和算子操作統(tǒng)計(jì)RDD中的元素個(gè)數(shù)以及打印輸出RDD中的所有元素。具體代碼如下:

   scala> import org.apache.spark.storage.StorageLevel
   import org.apache.spark.storage.StorageLevel
   scala> val list = List("hadoop","spark","hive")
   list: List[String] = List(hadoop, spark, hive)
   scala> val listRDD = sc.parallelize(list)
   listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
                         parallelize at <console>:27
   scala> listRDD.persist(StorageLevel.DISK_ONLY)
   res1: listRDD.type = ParallelCollectionRDD[0] at parallelize at <console>:27
  scala> println(listRDD.count())
  3
  scala> println(listRDD.collect().mkString(","))
  hadoop,spark,hive

  上述代碼中,第1行代碼導(dǎo)入StorageLevel對(duì)象的包;第3行代碼定義了一個(gè)列表list;第5行代碼執(zhí)行sc.parallelize(list)操作,創(chuàng)建了一個(gè)RDD,即listRDD;第8行代碼添加了persist()方法,用于持久化RDD,減少I(mǎi)/O操作,提高計(jì)算效率;第10行代碼執(zhí)行l(wèi)istRDD.count()行動(dòng)算子操作,將統(tǒng)計(jì)listRDD中元素的個(gè)數(shù);第12行代碼執(zhí)行l(wèi)istRDD.collect()行動(dòng)算子操作和mkString(“,”)操作,將listRDD中的所有元素進(jìn)行打印輸出,并且是以逗號(hào)為分隔符。

  需要注意的是,當(dāng)程序執(zhí)行到第8行代碼時(shí),并不會(huì)持久化listRDD,因?yàn)閘istRDD還沒(méi)有被真正計(jì)算;當(dāng)執(zhí)行第10行代碼時(shí),listRDD才會(huì)進(jìn)行第一次的行動(dòng)算子操作,觸發(fā)真正的從頭到尾的計(jì)算,這時(shí)listRDD.persist()方法才會(huì)被真正的執(zhí)行,把listRDD持久化到磁盤(pán)中;當(dāng)執(zhí)行到第12行代碼時(shí),進(jìn)行第二次的行動(dòng)算子操作,但不觸發(fā)從頭到尾的計(jì)算,只需使用已經(jīng)進(jìn)行持久化的listRDD來(lái)進(jìn)行計(jì)算。

  2.使用cache()方法對(duì)RDD進(jìn)行持久化

  定義一個(gè)列表list,通過(guò)該列表創(chuàng)建一個(gè)RDD,然后通過(guò)cache持久化操作和算子操作統(tǒng)計(jì)RDD中的元素個(gè)數(shù)以及打印輸出rdd中的所有元素。具體代碼如下:

   scala> val list= List("hadoop","spark","hive")
   list: List[String] = List(hadoop, spark, hive)
   scala> val listRDD= sc.parallelize(list)
   listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
                         parallelize at <console>:26
   scala> listRDD.cache()
   res2: listRDD.type = ParallelCollectionRDD[1] at parallelize at <console>:26
   scala> println(listRDD.count())
   3
   scala> println(listRDD.collect().mkString(","))
   hadoop,spark,hive

  上述代碼中,第6行代碼對(duì)listRDD進(jìn)行持久化操作,即添加cache()方法,用于持久化RDD,減少I(mǎi)/O操作,提高計(jì)算效率。然而,使用cache()方法進(jìn)行持久化操作,底層是調(diào)用了persist(MEMORY_ONLY)方法,用來(lái)對(duì)RDD進(jìn)行持久化。當(dāng)程序當(dāng)執(zhí)行到第6行代碼時(shí),并不會(huì)持久化listRDD,因?yàn)閘istRDD還沒(méi)有被真正計(jì)算;當(dāng)程序執(zhí)行第8行代碼時(shí),listRDD才會(huì)進(jìn)行第一次的行動(dòng)算子操作,觸發(fā)真正的從頭到尾的計(jì)算,這時(shí)listRDD.cache()方法才會(huì)被真正的執(zhí)行,把listRDD持久化到內(nèi)存中;當(dāng)程序執(zhí)行到第10行代碼時(shí),進(jìn)行第二次的行動(dòng)算子操作,但不觸發(fā)從頭到尾的計(jì)算,只需使用已經(jīng)持久化的listRDD來(lái)進(jìn)行計(jì)算。


猜你喜歡

Java API接口怎樣操作HBase分布式數(shù)據(jù)庫(kù)?

Scala的控制結(jié)構(gòu)語(yǔ)句有哪幾種?各語(yǔ)法格式介紹

Spark的集群安裝部署【大數(shù)據(jù)技術(shù)文章】

黑馬程序員大數(shù)據(jù)培訓(xùn)課程 

分享到:
在線(xiàn)咨詢(xún) 我要報(bào)名
和我們?cè)诰€(xiàn)交談!