三萬(wàn)字,Spark學(xué)習(xí)筆記
Spark 基礎(chǔ)
Spark特性
Spark使用簡(jiǎn)練優(yōu)雅的Scala語(yǔ)言編寫(xiě),基于Scala提供了交互式編程體驗(yàn),同時(shí)提供多種方便易用的API。Spark遵循“一個(gè)軟件棧滿足不同應(yīng)用場(chǎng)景”的設(shè)計(jì)理念,逐漸形成了一套完整的生態(tài)系統(tǒng)(包括 Spark提供內(nèi)存計(jì)算框架、SQL即席查詢(Spark SQL)、流式計(jì)算(Spark Streaming)、機(jī)器學(xué)習(xí)(MLlib)、圖計(jì)算(Graph X)等),Spark可以部署在yarn資源管理器上,提供一站式大數(shù)據(jù)解決方案,可以同時(shí)支持批處理、流處理、交互式查詢。
MapReduce計(jì)算模型延遲高,無(wú)法勝任實(shí)時(shí)、快速計(jì)算的需求,因而只適用于離線場(chǎng)景,Spark借鑒MapReduce計(jì)算模式,但與之相比有以下幾個(gè)優(yōu)勢(shì)(快、易用、全面):
- Spark提供更多種數(shù)據(jù)集操作類(lèi)型,編程模型比MapReduce更加靈活;
- Spark提供內(nèi)存計(jì)算,將計(jì)算結(jié)果直接放在內(nèi)存中,減少了迭代計(jì)算的IO開(kāi)銷(xiāo),有更高效的運(yùn)算效率。
- Spark基于DAG的任務(wù)調(diào)度執(zhí)行機(jī)制,迭代效率更高;在實(shí)際開(kāi)發(fā)中MapReduce需要編寫(xiě)很多底層代碼,不夠高效,Spark提供了多種高層次、簡(jiǎn)潔的API實(shí)現(xiàn)相同功能的應(yīng)用程序,實(shí)現(xiàn)代碼量比MapReduce少很多。
Spark作為計(jì)算框架只是取代了Hadoop生態(tài)系統(tǒng)中的MapReduce計(jì)算框架,它任需要HDFS來(lái)實(shí)現(xiàn)數(shù)據(jù)的分布式存儲(chǔ),Hadoop中的其他組件依然在企業(yè)大數(shù)據(jù)系統(tǒng)中發(fā)揮著重要作用。
Spark的不足:雖然Spark很快,但現(xiàn)在在生產(chǎn)環(huán)境中仍然不盡人意,無(wú)論擴(kuò)展性、穩(wěn)定性、管理性等方面都需要進(jìn)一步增強(qiáng);同時(shí)Spark在流處理領(lǐng)域能力有限,如果要實(shí)現(xiàn)亞秒級(jí)或大容量的數(shù)據(jù)獲取或處理需要其他流處理產(chǎn)品。
Cloudera旨在讓Spark流數(shù)據(jù)技術(shù)適用于80%的使用場(chǎng)合,就考慮到這一缺陷,在實(shí)時(shí)分析(而非簡(jiǎn)單數(shù)據(jù)過(guò)濾或分發(fā))場(chǎng)景中,很多以前使用S4或Storm等流式處理引擎的實(shí)現(xiàn)已經(jīng)逐漸被Kafka+Spark Streaming代替;
Hadoop現(xiàn)在分三塊HDFS/MR/YARN,Spark的流行將逐漸讓MapReduce、Tez走進(jìn)博物館;Spark只是作為一個(gè)計(jì)算引擎比MR的性能要好,但它的存儲(chǔ)和調(diào)度框架還是依賴于HDFS/YARN,Spark也有自己的調(diào)度框架,但不成熟,基本不可商用。
Spark部署(on Yarn)
YARN實(shí)現(xiàn)了一個(gè)集群多個(gè)框架”,即在一個(gè)集群上部署一個(gè)統(tǒng)一的資源調(diào)度管理框架,并部署其他各種計(jì)算框架,YARN為這些計(jì)算框架提供統(tǒng)一的資源調(diào)度管理服務(wù),并且能夠根據(jù)各種計(jì)算框架的負(fù)載需求調(diào)整各自占用的資源,實(shí)現(xiàn)集群資源共享和資源彈性收縮;
并且,YARN實(shí)現(xiàn)集群上的不同應(yīng)用負(fù)載混搭,有效提高了集群的利用率;不同計(jì)算框架可以共享底層存儲(chǔ),避免了數(shù)據(jù)集跨集群移動(dòng) ;
這里使用Spark on Yarn 模式部署,配置on yarn模式只需要修改很少配置,也不用使用啟動(dòng)spark集群命令,需要提交任務(wù)時(shí)候須指定在yarn上。
Spark運(yùn)行需要Scala語(yǔ)言,須下載Scala和Spark并解壓到家目錄,設(shè)置當(dāng)前用戶的環(huán)境變量(~/.bash_profile),增加SCALA_HOME和SPARK_HOME路徑并立即生效;啟動(dòng)scala命令和spark-shell命令驗(yàn)證是否成功;Spark的配置文件修改按照官網(wǎng)教程不好理解,這里完成的配置參照博客及調(diào)試。
Spark的需要修改兩個(gè)配置文件:spark-env.sh和spark-default.conf,前者需要指明Hadoop的hdfs和yarn的配置文件路徑及Spark.master.host地址,后者需要指明jar包地址;
spark-env.sh配置文件修改如下:
export JAVA_HOME=/home/stream/jdk1.8.0_144 export SCALA_HOME=/home/stream/scala-2.11.12 export HADOOP_HOME=/home/stream/hadoop-3.0.3 export HADOOP_CONF_DIR=/home/stream/hadoop-3.0.3/etc/hadoop export YARN_CONF_DIR=/home/stream/hadoop-3.0.3/etc/hadoop export SPARK_MASTER_HOST=xx export SPARK_LOCAL_IP=xxx spark-default.conf配置修改如下: //增加jar包地址 spark.yarn.jars=hdfs://1xxx/spark_jars/*
該設(shè)置表明將jar地址定義在hdfs上,必須將~/spark/jars路徑下所有的jar包都上傳到hdfs的/spark_jars/路徑(hadoop hdfs –put ~/spark/jars/*),否則會(huì)報(bào)錯(cuò)無(wú)法找到編譯jar包錯(cuò)誤;
Spark啟動(dòng)和驗(yàn)證
直接無(wú)參數(shù)啟動(dòng)./spark-shell ,運(yùn)行的是本地模式:
啟動(dòng)./spark-shell –master yarn,運(yùn)行的是on yarn模式,前提是yarn配置成功并可用:
在hdfs文件系統(tǒng)中創(chuàng)建文件README.md,并讀入RDD中,使用RDD自帶的參數(shù)轉(zhuǎn)換,RDD默認(rèn)每行為一個(gè)值:
使用./spark-shell --master yarn啟動(dòng)spark 后運(yùn)行命令:val textFile=sc.textFile(“README.md”)讀取hdfs上的README.md文件到RDD,并使用內(nèi)置函數(shù)測(cè)試如下,說(shuō)明spark on yarn配置成功.
常見(jiàn)問(wèn)題
在啟動(dòng)spark-shell時(shí)候,報(bào)錯(cuò)Yarn-site.xml中配置的最大分配內(nèi)存不足,調(diào)大這個(gè)值為2048M,需重啟yarn后生效。
設(shè)置的hdfs地址沖突,hdfs的配置文件中hdfs-site.xml設(shè)置沒(méi)有帶端口,但是spark-default.conf中的spark.yarn.jars值帶有端口,修改spark-default.conf的配置地址同前者一致:
Spark基本原理
在實(shí)際應(yīng)用中,大數(shù)據(jù)處理主要包括以下三個(gè)類(lèi)型:復(fù)雜的批量數(shù)據(jù)處理:通常時(shí)間跨度在數(shù)十分鐘到數(shù)小時(shí)之間;基于歷史數(shù)據(jù)的交互式查詢:通常時(shí)間跨度在數(shù)十秒到數(shù)分鐘之間;基于實(shí)時(shí)數(shù)據(jù)流的數(shù)據(jù)處理:通常時(shí)間跨度在數(shù)百毫秒到數(shù)秒之間;
同時(shí)存在以上場(chǎng)景需要同時(shí)部署多個(gè)組件,如:MapReduce/Impala/Storm,這樣做難免會(huì)帶來(lái)一些問(wèn)題:不同場(chǎng)景之間輸入輸出數(shù)據(jù)無(wú)法做到無(wú)縫共享,通常需要進(jìn)行數(shù)據(jù)格式的轉(zhuǎn)換,不同的軟件需要不同的開(kāi)發(fā)和維護(hù)團(tuán)隊(duì),帶來(lái)了較高的使用成本,比較難以對(duì)同一個(gè)集群中的各個(gè)系統(tǒng)進(jìn)行統(tǒng)一的資源協(xié)調(diào)和分配;
Spark的設(shè)計(jì)遵循“一個(gè)軟件棧滿足不同應(yīng)用場(chǎng)景”的理念,逐漸形成了一套完整的生態(tài)系統(tǒng),其生態(tài)系統(tǒng)包含了Spark Core、Spark SQL、Spark Streaming( Structured Streaming)、MLLib和GraphX 等組件,既能夠提供內(nèi)存計(jì)算框架,也可以支持SQL即席查詢、實(shí)時(shí)流式計(jì)算、機(jī)器學(xué)習(xí)和圖計(jì)算等。
而且Spark可以部署在資源管理器YARN之上,提供一站式的大數(shù)據(jù)解決方案;因此,Spark所提供的生態(tài)系統(tǒng)足以應(yīng)對(duì)上述三種場(chǎng)景,即批處理、交互式查詢和流數(shù)據(jù)處理。
Spark概念/架構(gòu)設(shè)計(jì)
RDD:是Resilient Distributed Dataset(彈性分布式數(shù)據(jù)集)的簡(jiǎn)稱,是分布式內(nèi)存的一個(gè)抽象概念,提供了一種高度受限的共享內(nèi)存模型 ;
DAG:是Directed Acyclic Graph(有向無(wú)環(huán)圖)的簡(jiǎn)稱,反映RDD之間的依賴關(guān)系 ;
Executor:是運(yùn)行在工作節(jié)點(diǎn)(WorkerNode)的一個(gè)進(jìn)程,負(fù)責(zé)運(yùn)行Task ;
應(yīng)用(Application):用戶編寫(xiě)的Spark應(yīng)用程序;
任務(wù)( Task ):運(yùn)行在Executor上的工作單元 ;
作業(yè)( Job ):一個(gè)作業(yè)包含多個(gè)RDD及作用于相應(yīng)RDD上的各種操作;
階段( Stage ):是作業(yè)的基本調(diào)度單位,一個(gè)作業(yè)會(huì)分為多組任務(wù),每組任務(wù)被稱為階段,或者也被稱為任務(wù)集合,代表了一組關(guān)聯(lián)的、相互之間沒(méi)有Shuffle依賴關(guān)系的任務(wù)組成的任務(wù)集;
Spark運(yùn)行架構(gòu)包括集群資源管理器(Cluster Manager)、運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(Worker Node)、每個(gè)應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)和每個(gè)工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor),資源管理器可以自帶或使用Mesos/YARN;
一個(gè)應(yīng)用由一個(gè)Driver和若干個(gè)作業(yè)構(gòu)成,一個(gè)作業(yè)由多個(gè)階段構(gòu)成,一個(gè)階段由多個(gè)沒(méi)有Shuffle關(guān)系的任務(wù)組成;
當(dāng)執(zhí)行一個(gè)應(yīng)用時(shí),Driver會(huì)向集群管理器申請(qǐng)資源,啟動(dòng)Executor,并向Executor發(fā)送應(yīng)用程序代碼和文件,然后在Executor上執(zhí)行任務(wù),運(yùn)行結(jié)束后,執(zhí)行結(jié)果會(huì)返回給Driver,或者寫(xiě)到HDFS或者其他數(shù)據(jù)庫(kù)中。
Spark運(yùn)行流程
SparkContext對(duì)象代表了和一個(gè)集群的連接:
(1)首先為應(yīng)用構(gòu)建起基本的運(yùn)行環(huán)境,即由Driver創(chuàng)建一個(gè)SparkContext,進(jìn)行資源的申請(qǐng)、任務(wù)的分配和監(jiān)控;
(2)資源管理器為Executor分配資源,并啟動(dòng)Executor進(jìn)程;
(3)SparkContext根據(jù)RDD的依賴關(guān)系構(gòu)建DAG圖,DAG圖提交給DAGScheduler解析成Stage,然后把一個(gè)個(gè)TaskSet提交給底層調(diào)度器TaskScheduler處理;Executor向SparkContext申請(qǐng)Task,Task Scheduler將Task發(fā)放給Executor運(yùn)行,并提供應(yīng)用程序代碼;
(4)Task在Executor上運(yùn)行,把執(zhí)行結(jié)果反饋給TaskScheduler,然后反饋給DAGScheduler,運(yùn)行完畢后寫(xiě)入數(shù)據(jù)并釋放所有資源;
Spark RDD
RDD概念/特性
許多迭代式算法(比如機(jī)器學(xué)習(xí)、圖算法等)和交互式數(shù)據(jù)挖掘工具,共同之處是不同計(jì)算階段之間會(huì)重用中間結(jié)果, MapReduce框架把中間結(jié)果寫(xiě)入到穩(wěn)定存儲(chǔ)(如磁盤(pán))中,帶來(lái)大量的數(shù)據(jù)復(fù)制、磁盤(pán)IO和序列化開(kāi)銷(xiāo)。
RDD就是為了滿足這種需求而出現(xiàn)的,它提供了一個(gè)抽象的數(shù)據(jù)架構(gòu),開(kāi)發(fā)者不必?fù)?dān)心底層數(shù)據(jù)的分布式特性,只需將具體的應(yīng)用邏輯表達(dá)為一系列轉(zhuǎn)換處理,不同RDD之間的轉(zhuǎn)換操作形成依賴關(guān)系,可以實(shí)現(xiàn)管道化,避免中間數(shù)據(jù)存儲(chǔ)。一個(gè)RDD就是一個(gè)分布式對(duì)象集合,本質(zhì)上是一個(gè)只讀的分區(qū)記錄集合,每個(gè)RDD可分成多個(gè)分區(qū),每個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,并且一個(gè)RDD的不同分區(qū)可以被保存到集群中不同的節(jié)點(diǎn)上,從而可以在集群中的不同節(jié)點(diǎn)上進(jìn)行并行計(jì)算。
RDD提供了一種高度受限的共享內(nèi)存模型,即RDD是只讀的記錄分區(qū)的集合,不能直接修改,只能基于穩(wěn)定的物理存儲(chǔ)中的數(shù)據(jù)集創(chuàng)建RDD,或者通過(guò)在其他RDD上執(zhí)行確定的轉(zhuǎn)換操作(如map、join和group by)而創(chuàng)建得到新的RDD。
RDD提供了豐富的操作以支持常見(jiàn)數(shù)據(jù)運(yùn)算,分“轉(zhuǎn)換”(Transformation)和“動(dòng)作”(Action)兩種類(lèi)型;RDD提供的轉(zhuǎn)換接口都非常簡(jiǎn)單,都是類(lèi)似map、filter、groupBy、join等粗粒度的數(shù)據(jù)轉(zhuǎn)換操作,而不是針對(duì)某個(gè)數(shù)據(jù)項(xiàng)的細(xì)粒度修改(不適合網(wǎng)頁(yè)爬蟲(chóng)),表面上RDD的功能很受限、不夠強(qiáng)大,實(shí)際上RDD已經(jīng)被實(shí)踐證明可以高效地表達(dá)許多框架的編程模型(比如MapReduce、SQL、Pregel);Spark用Scala語(yǔ)言實(shí)現(xiàn)了RDD的API,程序員可以通過(guò)調(diào)用API實(shí)現(xiàn)對(duì)RDD的各種操作
RDD典型的執(zhí)行過(guò)程如下,這一系列處理稱為一個(gè)Lineage(血緣關(guān)系),即DAG拓?fù)渑判虻慕Y(jié)果:
- RDD讀入外部數(shù)據(jù)源進(jìn)行創(chuàng)建;
- RDD經(jīng)過(guò)一系列的轉(zhuǎn)換(Transformation)操作,每一次都會(huì)產(chǎn)生不同的RDD,供給下一個(gè)轉(zhuǎn)換操作使用;
- 最后一個(gè)RDD經(jīng)過(guò)“動(dòng)作”操作進(jìn)行轉(zhuǎn)換,并輸出到外部數(shù)據(jù)源 優(yōu)點(diǎn):惰性調(diào)用、管道化、避免同步等待、不需要保存中間結(jié)果、操作簡(jiǎn)單;
Spark采用RDD以后能夠?qū)崿F(xiàn)高效計(jì)算的原因主要在于:
(1)高容錯(cuò)性:血緣關(guān)系、重新計(jì)算丟失分區(qū)、無(wú)需回滾系統(tǒng)、重算過(guò)程在不同節(jié)點(diǎn)之間并行、只記錄粗粒度的操作;
(2)中間結(jié)果持久化到內(nèi)存:數(shù)據(jù)在內(nèi)存中的多個(gè)RDD操作之間進(jìn)行傳遞,避免了不必要的讀寫(xiě)磁盤(pán)開(kāi)銷(xiāo);
(3)存放的數(shù)據(jù)是Java對(duì)象:避免了不必要的對(duì)象序列化和反序列化;
RDD依賴關(guān)系
Spark通過(guò)分析各個(gè)RDD的依賴關(guān)系生成了DAG,并根據(jù)RDD 依賴關(guān)系把一個(gè)作業(yè)分成多個(gè)階段,階段劃分的依據(jù)是窄依賴和寬依賴,窄依賴可以實(shí)現(xiàn)流水線優(yōu)化,寬依賴包含Shuffle過(guò)程,無(wú)法實(shí)現(xiàn)流水線方式處理。
窄依賴表現(xiàn)為一個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū)或多個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū);寬依賴則表現(xiàn)為存在一個(gè)父RDD的一個(gè)分區(qū)對(duì)應(yīng)一個(gè)子RDD的多個(gè)分區(qū)。
邏輯上每個(gè)RDD 操作都是一個(gè)fork/join(一種用于并行執(zhí)行任務(wù)的框架),把計(jì)算fork 到每個(gè)RDD 分區(qū),完成計(jì)算后對(duì)各個(gè)分區(qū)得到的結(jié)果進(jìn)行join 操作,然后fork/join下一個(gè)RDD 操作;
RDD Stage劃分:Spark通過(guò)分析各個(gè)RDD的依賴關(guān)系生成了DAG,再通過(guò)分析各個(gè)RDD中的分區(qū)之間的依賴關(guān)系來(lái)決定如何劃分Stage,具體方法:
- 在DAG中進(jìn)行反向解析,遇到寬依賴就斷開(kāi);
- 遇到窄依賴就把當(dāng)前的RDD加入到Stage中;
- 將窄依賴盡量劃分在同一個(gè)Stage中,可以實(shí)現(xiàn)流水線計(jì)算;
RDD運(yùn)行過(guò)程
通過(guò)上述對(duì)RDD概念、依賴關(guān)系和Stage劃分的介紹,結(jié)合之前介紹的Spark運(yùn)行基本流程,總結(jié)一下RDD在Spark架構(gòu)中的運(yùn)行過(guò)程:
(1)創(chuàng)建RDD對(duì)象;
(2)SparkContext負(fù)責(zé)計(jì)算RDD之間的依賴關(guān)系,構(gòu)建DAG;
(3)DAG Scheduler負(fù)責(zé)把DAG圖分解成多個(gè)Stage,每個(gè)Stage中包含了多個(gè)Task,每個(gè)Task會(huì)被TaskScheduler分發(fā)給各個(gè)WorkerNode上的Executor去執(zhí)行;
RDD創(chuàng)建
RDD的創(chuàng)建可以從從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建得到,或者通過(guò)并行集合(數(shù)組)創(chuàng)建RDD。Spark采用textFile()方法來(lái)從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD,該方法把文件的URI作為參數(shù),這個(gè)URI可以是本地文件系統(tǒng)的地址,或者是分布式文件系統(tǒng)HDFS的地址;
從文件系統(tǒng)中加載數(shù)據(jù):
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
從HDFS中加載數(shù)據(jù):
scala> val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
可以調(diào)用SparkContext的parallelize方法,在Driver中一個(gè)已經(jīng)存在的集合(數(shù)組)上創(chuàng)建。
scala>val array = Array(1,2,3,4,5) scala>val rdd = sc.parallelize(array)
或者從列表中創(chuàng)建:
scala>val list = List(1,2,3,4,5) scala>val rdd = sc.parallelize(list)
RDD操作
對(duì)于RDD而言,每一次轉(zhuǎn)換操作都會(huì)產(chǎn)生不同的RDD,供給下一個(gè)“轉(zhuǎn)換”使用,轉(zhuǎn)換得到的RDD是惰性求值的,也就是說(shuō),整個(gè)轉(zhuǎn)換過(guò)程只是記錄了轉(zhuǎn)換的軌跡,并不會(huì)發(fā)生真正的計(jì)算,只有遇到行動(dòng)操作時(shí),才會(huì)發(fā)生真正的計(jì)算,開(kāi)始從血緣關(guān)系源頭開(kāi)始,進(jìn)行物理的轉(zhuǎn)換操作;
常用的RDD轉(zhuǎn)換操作,總結(jié)如下:
filter(func)操作:篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集
scala> val lines =sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt) scala> val linesWithSpark=lines.filter(line => line.contains("Spark"))
map(func)操作:map(func)操作將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集
scala> data=Array(1,2,3,4,5) scala> val rdd1= sc.parallelize(data) scala> val rdd2=rdd1.map(x=>x+10)
另一個(gè)實(shí)例:
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") scala> val words=lines.map(line => line.split(" "))
flatMap(func)操作:拍扁操作
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") scala> val words=lines.flatMap(line => line.split(" "))
groupByKey()操作:應(yīng)用于(K,V)鍵值對(duì)的數(shù)據(jù)集時(shí),返回一個(gè)新的(K, Iterable)形式的數(shù)據(jù)集;
reduceByKey(func)操作:應(yīng)用于(K,V)鍵值對(duì)的數(shù)據(jù)集返回新(K, V)形式數(shù)據(jù)集,其中每個(gè)值是將每個(gè)key傳遞到函數(shù)func中進(jìn)行聚合后得到的結(jié)果:
行動(dòng)操作是真正觸發(fā)計(jì)算的地方。Spark程序執(zhí)行到行動(dòng)操作時(shí),才會(huì)執(zhí)行真正的計(jì)算,這就是惰性機(jī)制,“惰性機(jī)制”是指,整個(gè)轉(zhuǎn)換過(guò)程只是記錄了轉(zhuǎn)換的軌跡,并不會(huì)發(fā)生真正的計(jì)算,只有遇到行動(dòng)操作時(shí),才會(huì)觸發(fā)“從頭到尾”的真正的計(jì)算,常用的行動(dòng)操作:
RDD持久
Spark RDD采用惰性求值的機(jī)制,但是每次遇到行動(dòng)操作都會(huì)從頭開(kāi)始執(zhí)行計(jì)算,每次調(diào)用行動(dòng)操作都會(huì)觸發(fā)一次從頭開(kāi)始的計(jì)算,這對(duì)于迭代計(jì)算而言代價(jià)是很大的,迭代計(jì)算經(jīng)常需要多次重復(fù)使用同一組數(shù)據(jù):
scala> val list = List("Hadoop","Spark","Hive") scala> val rdd = sc.parallelize(list) scala> println(rdd.count()) //行動(dòng)操作,觸發(fā)一次真正從頭到尾的計(jì)算 scala> println(rdd.collect().mkString(",")) //行動(dòng)操作,觸發(fā)一次真正從頭到尾的計(jì)算
可以通過(guò)持久化(緩存)機(jī)制避免這種重復(fù)計(jì)算的開(kāi)銷(xiāo),可以使用persist()方法對(duì)一個(gè)RDD標(biāo)記為持久化,之所以說(shuō)“標(biāo)記為持久化”,是因?yàn)槌霈F(xiàn)persist()語(yǔ)句的地方,并不會(huì)馬上計(jì)算生成RDD并把它持久化,而是要等到遇到第一個(gè)行動(dòng)操作觸發(fā)真正計(jì)算以后,才會(huì)把計(jì)算結(jié)果進(jìn)行持久化,持久化后的RDD將會(huì)被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中被后面的行動(dòng)操作重復(fù)使用;
persist()的圓括號(hào)中包含的是持久化級(jí)別參數(shù),persist(MEMORY_ONLY)表示將RDD作為反序列化的對(duì)象存儲(chǔ)于JVM中,如果內(nèi)存不足,就要按照LRU原則替換緩存中的內(nèi)容;persist(MEMORY_AND_DISK)表示將RDD作為反序列化的對(duì)象存儲(chǔ)在JVM中,如果內(nèi)存不足,超出的分區(qū)將會(huì)被存放在硬盤(pán)上;一般而言,使用cache()方法時(shí),會(huì)調(diào)用persist(MEMORY_ONLY),同時(shí)可以使用unpersist()方法手動(dòng)地把持久化的RDD從緩存中移除。
針對(duì)上面的實(shí)例,增加持久化語(yǔ)句以后的執(zhí)行過(guò)程如下:
scala> val list = List("Hadoop","Spark","Hive") scala> val rdd = sc.parallelize(list) scala> rdd.cache() //會(huì)調(diào)用persist(MEMORY_ONLY),但是,語(yǔ)句執(zhí)行到這里,并不會(huì)緩存rdd,因?yàn)檫@時(shí)rdd還沒(méi)有被計(jì)算生成 scala> println(rdd.count()) //第一次行動(dòng)操作,觸發(fā)一次真正從頭到尾的計(jì)算,這時(shí)上面的rdd.cache()才會(huì)被執(zhí)行,把這個(gè)rdd放到緩存中 scala> println(rdd.collect().mkString(",")) //第二次行動(dòng)操作,不需要觸發(fā)從頭到尾的計(jì)算,只需要重復(fù)使用上面緩存中的rdd
RDD分區(qū)
RDD是彈性分布式數(shù)據(jù)集,通常RDD很大,會(huì)被分成很多個(gè)分區(qū)分別保存在不同的節(jié)點(diǎn)上,分區(qū)的作用:(1)增加并行度(2)減少通信開(kāi)銷(xiāo)。RDD分區(qū)原則是使得分區(qū)的個(gè)數(shù)盡量等于集群中的CPU核心(core)數(shù)目,對(duì)于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過(guò)設(shè)置spark.default.parallelism這個(gè)參數(shù)的值,來(lái)配置默認(rèn)的分區(qū)數(shù)目,一般而言:
本地模式:默認(rèn)為本地機(jī)器的CPU數(shù)目,若設(shè)置了local[N],則默認(rèn)為N;
Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和2二者中取較大值作為默認(rèn)值;
設(shè)置分區(qū)的個(gè)數(shù)有兩種方法:創(chuàng)建RDD時(shí)手動(dòng)指定分區(qū)個(gè)數(shù),使用reparititon方法重新設(shè)置分區(qū)個(gè)數(shù);
創(chuàng)建RDD時(shí)手動(dòng)指定分區(qū)個(gè)數(shù):在調(diào)用textFile()和parallelize()方法的時(shí)候手動(dòng)指定分區(qū)個(gè)數(shù)即可,語(yǔ)法格式如 sc.textFile(path, partitionNum),其中path參數(shù)用于指定要加載的文件的地址,partitionNum參數(shù)用于指定分區(qū)個(gè)數(shù)。
scala> val array = Array(1,2,3,4,5) scala> val rdd = sc.parallelize(array,2) //設(shè)置兩個(gè)分區(qū)
reparititon方法重新設(shè)置分區(qū)個(gè)數(shù):通過(guò)轉(zhuǎn)換操作得到新 RDD 時(shí),直接調(diào)用 repartition 方法即可,如:
scala> val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
scala> data.partitions.size //顯示data這個(gè)RDD的分區(qū)數(shù)量
scala> val rdd = data.repartition(1) //對(duì)data這個(gè)RDD進(jìn)行重新分區(qū)
scala> rdd.partitions.size
res4: Int = 1
Spark-shell批處理
完成Spark部署后,使用spark-shell指令進(jìn)入Scala交互編程界面,spark-shell默認(rèn)創(chuàng)建一個(gè)sparkContext(sc),在spark-shell啟動(dòng)時(shí)候可以查看運(yùn)行模式是on yarn還是local模式,使用交互式界面可以直接引用sc變量使用;
使用Spark-shell處理數(shù)據(jù)實(shí)例:讀取HDFS文件系統(tǒng)中文件實(shí)現(xiàn)WordCount 單詞計(jì)數(shù):
sc.textFile("hdfs://172.22.241.183:8020/user/spark/yzg_test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect()
其中,map((_,1)) 等同于map(x => (x, 1))
使用saveAsText File()函數(shù)可以將結(jié)果保存到文件系統(tǒng)中。
Scala及函數(shù)式編程
Spark采用Scala語(yǔ)言編寫(xiě),在開(kāi)發(fā)中需要熟悉函數(shù)式編程思想,并熟練使用Scala語(yǔ)言,使用Scala進(jìn)行Spark開(kāi)發(fā)的代碼量大大少于Java開(kāi)發(fā)的代碼量;
函數(shù)式編程特性
函數(shù)式編程屬于聲明式編程的一種,將計(jì)算描述為數(shù)學(xué)函數(shù)的求值,但函數(shù)式編程沒(méi)有準(zhǔn)確的定義,只是一系列理念,并不需要嚴(yán)格準(zhǔn)守,可以理解為函數(shù)式編程把程序看做是數(shù)學(xué)函數(shù),輸入的是自變量,輸出因變量,通過(guò)表達(dá)式完成計(jì)算,當(dāng)前越來(lái)越多的命令式語(yǔ)言支持部分的函數(shù)式編程特性。
在函數(shù)式編程中,函數(shù)作為一等公民,就是說(shuō)函數(shù)的行為和普通變量沒(méi)有區(qū)別,可以作為函參進(jìn)行傳遞,也可以在函數(shù)內(nèi)部聲明一個(gè)函數(shù),那么外層的函數(shù)就被稱作高階函數(shù)。
函數(shù)式編程的curry化:把接受多個(gè)參數(shù)的函數(shù)變換成接受一個(gè)單一參數(shù)的函數(shù),返回接受余下的參數(shù)并且返回結(jié)果的新函數(shù)。
函數(shù)式編程要求所有的變量都是常量(這里所用的變量這個(gè)詞并不準(zhǔn)確,只是為了便于理解),erlang是其中的典型語(yǔ)言,雖然許多語(yǔ)言支持部分函數(shù)式編程的特性,但是并不要求變量必須是常量。這樣的特性提高了編程的復(fù)雜度,但是使代碼沒(méi)有副作用,并且?guī)?lái)了很大的一個(gè)好處,那就是大大簡(jiǎn)化了并發(fā)編程。
Java中最常用的并發(fā)模式是共享內(nèi)存模型,依賴于線程與鎖,若代碼編寫(xiě)不當(dāng),會(huì)發(fā)生死鎖和競(jìng)爭(zhēng)條件,并且隨著線程數(shù)的增加,會(huì)占用大量的系統(tǒng)資源。在函數(shù)式編程中,因?yàn)槎际浅A?,所以根本就不用考慮死鎖等情況。為什么說(shuō)一次賦值提高了編程的復(fù)雜度,既然所有變量都是常量,那么我們沒(méi)辦法更改一個(gè)變量的值,循環(huán)的意義也就不大,所以haskell與erlang中使用遞歸代替了循環(huán)。
Scala語(yǔ)法
Scala即可伸縮的語(yǔ)言(Scalable Language),是一種多范式的編程語(yǔ)言,類(lèi)似于java的編程,設(shè)計(jì)初衷是要集成面向?qū)ο缶幊毯秃瘮?shù)式編程的各種特性。
Scala函數(shù)地位:一等公民
在Scala中函數(shù)是一等公民,像變量一樣既可以作為函參使用,也可以將函數(shù)賦值給一個(gè)變量;而且函數(shù)的創(chuàng)建不用依賴于類(lèi)、或?qū)ο螅贘ava中函數(shù)的創(chuàng)建則要依賴于類(lèi)、抽象類(lèi)或者接口。Scala函數(shù)有兩種定義:
Scala的函數(shù)定義規(guī)范化寫(xiě)法,最后一行代碼是它的返回值:
精簡(jiǎn)后函數(shù)定義可以只有一行:
也可以直接使用val將函數(shù)定義成變量,表示定義函數(shù)addInt,輸入?yún)?shù)有兩個(gè),分別為x,y,均為Int類(lèi)型,返回值為兩者的和,類(lèi)型Int:
Scala匿名函數(shù)(函數(shù)字面量)
Scala中的匿名函數(shù)也叫做函數(shù)字面量,既可以作為函數(shù)的參數(shù)使用,也可以將其賦值給一個(gè)變量,在匿名函數(shù)的定義中“=>”可理解為一個(gè)轉(zhuǎn)換器,它使用右側(cè)的算法,將左側(cè)的輸入數(shù)據(jù)轉(zhuǎn)換為新的輸出數(shù)據(jù),使用匿名函數(shù)后,我們的代碼變得更簡(jiǎn)潔了。
val test = (x:Int) => x + 1
Scala高階函數(shù)
Scala使用術(shù)語(yǔ)“高階函數(shù)”來(lái)表示那些把函數(shù)作為參數(shù)或函數(shù)作為返回結(jié)果的方法和函數(shù)。比如常見(jiàn)的有map,filter,reduce等函數(shù),它們可以接受一個(gè)函數(shù)作為參數(shù)。
Scala閉包
Scala中的閉包指的是當(dāng)函數(shù)的變量超出它的有效作用域的時(shí)候,還能對(duì)函數(shù)內(nèi)部的變量進(jìn)行訪問(wèn);Scala中的閉包捕獲到的是變量的本身而不僅僅是變量的數(shù)值,當(dāng)自由變量發(fā)生變化時(shí),Scala中的閉包能夠捕獲到這個(gè)變化;如果自由變量在閉包內(nèi)部發(fā)生變化,也會(huì)反映到函數(shù)外面定義的自由變量的數(shù)值。
Scala部分應(yīng)用函數(shù)
部分應(yīng)用函數(shù)只是在“已有函數(shù)”的基礎(chǔ)上,提供部分默認(rèn)參數(shù),未提供默認(rèn)參數(shù)的地方使用下劃線替代,從而創(chuàng)建出一個(gè)“函數(shù)值”,在使用這個(gè)函數(shù)值(部分應(yīng)用函數(shù))的時(shí)候,只需提供下劃線部分對(duì)應(yīng)的參數(shù)即可;部分應(yīng)用函數(shù)本質(zhì)上是一種值類(lèi)型的表達(dá)式,在使用的時(shí)候不需要提供所有的參數(shù),只需要提供部分參數(shù)。
Scala柯里化函數(shù)
scala中的柯里化指的是將原來(lái)接受兩個(gè)參數(shù)的函數(shù)變成新的接受一個(gè)參數(shù)的函數(shù)的過(guò)程,新的函數(shù)返回一個(gè)以原有第二個(gè)參數(shù)作為參數(shù)的函數(shù);
def someAction(f:(Double)=>Double) = f(10)
只要滿足:函數(shù)參數(shù)是一個(gè)double、返回值也是一個(gè)double,這個(gè)函數(shù)就可以作為f值;
Spark SQL
Shark和Spark SQL
Shark的出現(xiàn),使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高,但Shark的設(shè)計(jì)導(dǎo)致了兩個(gè)問(wèn)題:
- 一是執(zhí)行計(jì)劃優(yōu)化完全依賴于Hive,不方便添加新的優(yōu)化策略
- 二是因?yàn)镾park是線程級(jí)并行,而MapReduce是進(jìn)程級(jí)并行,因此,Spark在兼容Hive的實(shí)現(xiàn)上存在線程安全問(wèn)題,導(dǎo)致Shark不得不使用另外一套獨(dú)立維護(hù)的打了補(bǔ)丁的Hive源碼分支 ;
Spark SQL在Hive兼容層面僅依賴HiveQL解析、Hive元數(shù)據(jù),也就是說(shuō),從HQL被解析成抽象語(yǔ)法樹(shù)(AST)起,就全部由Spark SQL接管了。Spark SQL執(zhí)行計(jì)劃生成和優(yōu)化都由Catalyst(函數(shù)式關(guān)系查詢優(yōu)化框架)負(fù)責(zé) ;
DataFrame和RDD
Spark SQL增加了DataFrame(即帶有Schema信息的RDD),使用戶可以在Spark SQL中執(zhí)行SQL語(yǔ)句,數(shù)據(jù)既可以來(lái)自RDD,也可以是Hive、HDFS、Cassandra等外部數(shù)據(jù)源,還可以是JSON格式的數(shù)據(jù),Spark SQL目前支持Scala、Java、Python三種語(yǔ)言,支持SQL-92規(guī)范 ;
- DataFrame的推出,讓Spark具備了處理大規(guī)模結(jié)構(gòu)化數(shù)據(jù)的能力,不僅比原有的RDD轉(zhuǎn)化方式更加簡(jiǎn)單易用,且獲得了更高的計(jì)算性能;
- Spark可輕松實(shí)現(xiàn)從MySQL到DataFrame的轉(zhuǎn)化,且支持SQL查詢;
RDD是分布式的 Java對(duì)象的集合,但是,對(duì)象內(nèi)部結(jié)構(gòu)對(duì)于RDD而言卻是不可知的;DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,提供了詳細(xì)的結(jié)構(gòu)信息。
RDD就像一個(gè)屋子,找東西要把這個(gè)屋子翻遍才能找到;DataFrame相當(dāng)于在你的屋子里面打上了貨架,只要告訴他你是在第幾個(gè)貨架的第幾個(gè)位置, DataFrame就是在RDD基礎(chǔ)上加入了列,處理數(shù)據(jù)就像處理二維表一樣。
DataFrame與RDD的主要區(qū)別在于,前者帶schema元信息,即DataFrame表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類(lèi)型。這使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對(duì)藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。反觀RDD,由于無(wú)從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡(jiǎn)單、通用的流水線優(yōu)化。
DataFrame的創(chuàng)建
Spark2.0版本開(kāi)始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來(lái)實(shí)現(xiàn)其對(duì)數(shù)據(jù)加載、轉(zhuǎn)換、處理等功能。SparkSession實(shí)現(xiàn)了SQLContext及HiveContext所有功能;
SparkSession支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame,支持把DataFrame轉(zhuǎn)換成SQLContext自身中的表,然后使用SQL語(yǔ)句來(lái)操作數(shù)據(jù)。SparkSession亦提供了HiveQL以及其他依賴于Hive的功能的支持;可以通過(guò)如下語(yǔ)句創(chuàng)建一個(gè)SparkSession對(duì)象:
scala> import org.apache.spark.sql.SparkSession scala> val spark=SparkSession.builder().getOrCreate()
在創(chuàng)建DataFrame前,為支持RDD轉(zhuǎn)換為DataFrame及后續(xù)的SQL操作,需通過(guò)import語(yǔ)句(即import spark.implicits._)導(dǎo)入相應(yīng)包,啟用隱式轉(zhuǎn)換。
在創(chuàng)建DataFrame時(shí),可使用spark.read操作從不同類(lèi)型的文件中加載數(shù)據(jù)創(chuàng)建DataFrame,如:spark.read.json("people.json"):讀取people.json文件創(chuàng)建DataFrame;在讀取本地文件或HDFS文件時(shí),要注意給出正確的文件路徑;spark.read.csv("people.csv"):讀取people.csv文件創(chuàng)建DataFrame;
讀取hdfs上的json文件,并打印,json文件為:
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
讀取代碼:
import org.apache.spark.sql.SparkSession
val spark=SparkSession.builder().getOrCreate()
import spark.implicits._
val df =spark.read.json("hdfs://172.22.241.183:8020/user/spark/json_sparksql.json")
df.show()
RDD轉(zhuǎn)換DataFrame
Spark官網(wǎng)提供了兩種方法來(lái)實(shí)現(xiàn)從RDD轉(zhuǎn)換得到DataFrame:① 利用反射來(lái)推斷包含特定類(lèi)型對(duì)象的RDD的schema,適用對(duì)已知數(shù)據(jù)結(jié)構(gòu)的RDD轉(zhuǎn)換;② 使用編程接口,構(gòu)造一個(gè)schema并將其應(yīng)用在已知的RDD上;
Spark-sql即席查詢
SparkSQL 的元數(shù)據(jù)的狀態(tài)有兩種:① in_memory,用完了元數(shù)據(jù)也就丟了;② 通過(guò)hive保存,hive的元數(shù)據(jù)存在哪兒,它的元數(shù)據(jù)也就存在哪,SparkSQL數(shù)據(jù)倉(cāng)庫(kù)建立在Hive之上實(shí)現(xiàn)的,使用SparkSQL去構(gòu)建數(shù)據(jù)倉(cāng)庫(kù)的時(shí)候,必須依賴于Hive。
Spark-sql命令行提供了即席查詢能力,可以使用類(lèi)sql方式操作數(shù)據(jù)源,效率高于hive,常用語(yǔ)句:https://www.cnblogs.com/BlueSkyyj/p/9640626.html;
spark-sql導(dǎo)入數(shù)據(jù)到數(shù)倉(cāng):https://www.cnblogs.com/chenfool/p/4502212.html;
Spark Streaming
Spark Streaming是Spark Core擴(kuò)展而來(lái)的一個(gè)高吞吐、高容錯(cuò)的實(shí)時(shí)處理引擎,同Storm的最大區(qū)別在于無(wú)法實(shí)現(xiàn)毫秒級(jí)計(jì)算,而Storm可以實(shí)現(xiàn)毫秒級(jí)響應(yīng),Spark Streaming 實(shí)現(xiàn)方式是批量計(jì)算,按照時(shí)間片對(duì)stream切割形成靜態(tài)數(shù)據(jù),并且基于RDD數(shù)據(jù)集更容易做高效的容錯(cuò)處理。Spark Streaming的輸入和輸出數(shù)據(jù)源可以是多種。Spark Streaming 實(shí)時(shí)讀取數(shù)據(jù)并將數(shù)據(jù)分為小批量的batch,然后在spark引擎中處理生成批量的結(jié)果集。Spark Streaming提供了稱為離散流或DStream的高級(jí)抽象概念,它表示連續(xù)的數(shù)據(jù)流。DStreams既可以從Kafka、Flume等源的輸入數(shù)據(jù)流創(chuàng)建,也可以通過(guò)在其他DStreams上應(yīng)用高級(jí)操作創(chuàng)建。在內(nèi)部DStream表示為RDD序列。
在這里從一個(gè)例子開(kāi)始介紹,StreamingContext是所有的流式計(jì)算的主要實(shí)體,創(chuàng)建含有兩個(gè)執(zhí)行線程的本地StreamingContext和1秒鐘的batch,然后創(chuàng)建一個(gè)Dstream(lines)用于監(jiān)聽(tīng)TCP端口,lines中的每一行就是一個(gè)RDD,flatMap函數(shù)將一個(gè)RDD分解成多個(gè)記錄,是一對(duì)多的Dstream操作,這里使用空格將lines分解成單詞,words被映射成(word, 1)對(duì),隨后進(jìn)行詞頻統(tǒng)計(jì),例子的代碼如下:
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()
Streaming 原理
可以參考官網(wǎng)教程:http://spark.apache.org/docs/latest/streaming-programming-guide.html,Spark Streaming提供了稱為離散流或DStream的高級(jí)抽象,它表示連續(xù)的數(shù)據(jù)流,在內(nèi)部DStream表示為RDD序列,每個(gè)RDD包含一定間隔的數(shù)據(jù),如下圖所示:
所有對(duì)于DStream的操作都會(huì)相應(yīng)地轉(zhuǎn)換成對(duì)RDDs的操作,在上面的例子中,flatMap操作被應(yīng)用到lines 中的每個(gè)RDD中生成了一組RDD(即words)
總結(jié)編寫(xiě)Spark Streaming程序的基本步驟是:
1.通過(guò)創(chuàng)建輸入DStream來(lái)定義輸入源
2.通過(guò)對(duì)DStream應(yīng)用轉(zhuǎn)換操作和輸出操作來(lái)定義流計(jì)算
3.用streamingContext.start()來(lái)開(kāi)始接收數(shù)據(jù)和處理流程
4.通過(guò)streamingContext.awaitTermination()方法來(lái)等待處理結(jié)束(手動(dòng)結(jié)束或因?yàn)殄e(cuò)誤而結(jié)束)
5.可以通過(guò)streamingContext.stop()來(lái)手動(dòng)結(jié)束流計(jì)算進(jìn)程
StreamingContext
有兩種創(chuàng)建StreamingContext的方式:通過(guò)SparkContext創(chuàng)建和通過(guò)SparkConf創(chuàng)建;
Spark conf創(chuàng)建:
val conf = new SparkConf().setAppName(appName).setMaster(master); val ssc = new StreamingContext(conf, Seconds(1));
appName是用來(lái)在Spark UI上顯示的應(yīng)用名稱。master是Spark、Mesos或Yarn集群的URL,或者是local[*]。batch interval可以根據(jù)你的應(yīng)用程序的延遲要求以及可用的集群資源情況來(lái)設(shè)置。
SparkContext創(chuàng)建:
val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1))
輸入DStreams和Receiver
在前面的例子中l(wèi)ines就是從源得到的輸入DStream,輸入DStream對(duì)應(yīng)一個(gè)接收器對(duì)象,可以從源接收消息并存儲(chǔ)到Spark內(nèi)存中進(jìn)行處理。Spark Streaming提供兩種streaming源:
- 基礎(chǔ)源:直接可以使用streaming上下文API的源,比如files和socket;
- 高級(jí)源:通過(guò)引用額外實(shí)體類(lèi)得到的Kafka,F(xiàn)lume源;可以在應(yīng)用中創(chuàng)建使用多個(gè)輸入DStreams來(lái)實(shí)現(xiàn)同時(shí)讀取多種數(shù)據(jù)流,worker/executor 是持久運(yùn)行的任務(wù),因此它將占用一個(gè)分給該應(yīng)用的core,因此Spark Streaming需要分配足夠的core去運(yùn)行接收器和處理接收的數(shù)據(jù);
在本地運(yùn)行Spark Streaming程序時(shí),不要使用“l(fā)ocal”或“l(fā)ocal[1]”作為主URL。這兩者中的任何一個(gè)都意味著在本地運(yùn)行任務(wù)只使用一個(gè)線程。如果使用基于receiver的輸入DStream(如Kafka、Flume等),這表明將使用單個(gè)線程運(yùn)行receiver,而不留下用于處理所接收數(shù)據(jù)的線程。因此在本地運(yùn)行時(shí),始終使用“l(fā)ocal[n]”作為主URL,其中n必須大于運(yùn)行的receiver數(shù)量,否則系統(tǒng)將接收數(shù)據(jù),但不能處理它。
Kafka和Flume這類(lèi)源需要外部依賴包,其中一些庫(kù)具有復(fù)雜的依賴關(guān)系,Spark shell中沒(méi)有這些高級(jí)源代碼,因此無(wú)法在spark-shell中測(cè)試基于這些高級(jí)源代碼的應(yīng)用程序,但可以手動(dòng)將包引入;
基于可靠性的考慮,可以將數(shù)據(jù)源分為兩類(lèi):可靠的接收器的數(shù)據(jù)被Receiver 接收后發(fā)送確認(rèn)到源頭(如Kafka ,Flume)并將數(shù)據(jù)存儲(chǔ)在spark,不可靠的接收器不會(huì)向源發(fā)送確認(rèn)。
DStreams轉(zhuǎn)換
與RDD類(lèi)似,轉(zhuǎn)換操作允許修改來(lái)自輸入DStream的數(shù)據(jù),轉(zhuǎn)換操作包括無(wú)狀態(tài)轉(zhuǎn)換操作和有狀態(tài)轉(zhuǎn)換操作。
無(wú)狀態(tài)轉(zhuǎn)換操作實(shí)例:下節(jié)spark-shell中“套接字流”詞頻統(tǒng)計(jì)采用無(wú)狀態(tài)轉(zhuǎn)換,每次統(tǒng)計(jì)都只統(tǒng)計(jì)當(dāng)前批次到達(dá)的單詞的詞頻,和之前批次無(wú)關(guān),不會(huì)進(jìn)行累計(jì)。
有狀態(tài)轉(zhuǎn)換操作實(shí)例:滑動(dòng)窗口轉(zhuǎn)換操作和updateStateByKey操作;
一些常見(jiàn)的轉(zhuǎn)換如下:
窗口操作
每次窗口在源DStream上滑動(dòng),窗口內(nèi)的源RDD被組合/操作生成了窗口RDD,在圖例中,過(guò)去3個(gè)時(shí)間單位的數(shù)據(jù)將被操作,并按2個(gè)時(shí)間單位滑動(dòng)。
任何窗口操作都需要指定兩個(gè)參數(shù):窗口長(zhǎng)度:窗口的持續(xù)時(shí)間(圖中值是3);滑動(dòng)間隔:執(zhí)行窗口操作的間隔(圖中值是2)。這兩個(gè)參數(shù)必須是源DStream的批處理間隔的倍數(shù)(圖中值是1)
舉例說(shuō)明窗口操作:希望通過(guò)每隔10秒在最近30秒數(shù)據(jù)中生成字?jǐn)?shù)來(lái)擴(kuò)展前面的示例。為此,我們必須在最近的30秒數(shù)據(jù)上對(duì)(word,1)的DStream鍵值對(duì)應(yīng)用reduceByKey操作。這是使用reduceByKeyAndWindow操作完成的。
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
所有的滑動(dòng)窗口操作都需要使用參數(shù):windowLength(窗口長(zhǎng)度)和slideInterval(滑動(dòng)間隔),常見(jiàn)窗口操作總結(jié)如下,對(duì)應(yīng)的含義可參照RDD的轉(zhuǎn)換操作:
Window:基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù)計(jì)算得到新的Dstream;
countByWindow: 返回DStream中元素的滑動(dòng)窗口計(jì)數(shù);
reduceByWindow:返回一個(gè)單元素流。利用函數(shù)func聚集滑動(dòng)時(shí)間間隔的流的元素創(chuàng)建這個(gè)單元素流。函數(shù)func必須滿足結(jié)合律從而支持并行計(jì)算;
reduceByKeyAndWindow(三參數(shù)):應(yīng)用到一個(gè)(K,V)鍵值對(duì)組成的DStream上時(shí),會(huì)返回一個(gè)由(K,V)鍵值對(duì)組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)(func函數(shù))進(jìn)行聚合計(jì)算。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。可以通過(guò)numTasks參數(shù)的設(shè)置來(lái)指定不同的任務(wù)數(shù);
reduceByKeyAndWindow(四參數(shù)):比上述reduceByKeyAndWindow(三參數(shù))更高效的reduceByKeyAndWindow,每個(gè)窗口的reduce值是基于先前窗口的reduce值進(jìn)行增量計(jì)算得到的;它會(huì)對(duì)進(jìn)入滑動(dòng)窗口的新數(shù)據(jù)進(jìn)行reduce操作,并對(duì)離開(kāi)窗口的老數(shù)據(jù)進(jìn)行“逆向reduce”操作。但是,只能用于“可逆reduce函數(shù)”,即那些reduce函數(shù)都有一個(gè)對(duì)應(yīng)的“逆向reduce函數(shù)”(以InvFunc參數(shù)傳入)。
countByValueAndWindow:當(dāng)應(yīng)用到一個(gè)(K,V)鍵值對(duì)組成的DStream上,返回一個(gè)由(K,V)鍵值對(duì)組成的新的DStream。每個(gè)key的值都是它們?cè)诨瑒?dòng)窗口中出現(xiàn)的頻率。
updateStateByKey:需要在跨批次之間維護(hù)狀態(tài)時(shí),必須使用updateStateByKey操作;
多流關(guān)聯(lián)
窗口計(jì)算上join操作非常有用,在Spark Streaming中可以輕松實(shí)現(xiàn)不同類(lèi)型的join,包括leftouterjoin、rightouterjoin和fulloterjoin。每個(gè)批處理間隔中stream1生成的RDD與stream2生成的RDD關(guān)聯(lián)如下:
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
Dstream的輸出
輸出操作允許將DStream的數(shù)據(jù)推送到外部系統(tǒng),如數(shù)據(jù)庫(kù)或files,由于輸出操作觸發(fā)所有DStream轉(zhuǎn)換的實(shí)際執(zhí)行(類(lèi)似于RDD的操作),并允許外部系統(tǒng)使用轉(zhuǎn)換后的數(shù)據(jù),輸出操作有以下幾種:
在輸出DStream中,Dstream.foreachRDD是一個(gè)功能強(qiáng)大的原語(yǔ).
DataFrame和SQL操作
可以輕松地對(duì)流數(shù)據(jù)使用DataFrames和SQL操作,但必須使用StreamingContext正在用的SparkContext創(chuàng)建SparkSession。下面例子使用DataFrames和SQL生成單詞計(jì)數(shù)。每個(gè)RDD都轉(zhuǎn)換為DataFrame,注冊(cè)為臨時(shí)表后使用SQL進(jìn)行查詢:
val words: DStream[String] = words.foreachRDD { rdd => val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ val wordsDataFrame = rdd.toDF("word") wordsDataFrame.createOrReplaceTempView("words") val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
Spark-shell流處理
進(jìn)入spark-shell后就默認(rèn)獲得了的SparkConext,即sc,從SparkConf對(duì)象創(chuàng)建StreamingContext對(duì)象,spark-shell中創(chuàng)建StreamingContext對(duì)象如下:
scala> import org.apache.spark.streaming._ scala> val ssc = new StreamingContext(sc, Seconds(1))
如果是編寫(xiě)一個(gè)獨(dú)立的Spark Streaming程序,而不是在spark-shell中運(yùn)行,則需要通過(guò)如下方式創(chuàng)建StreamingContext對(duì)象:
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(1))
文件流
文件流可以讀取本機(jī)文件,也可以讀取讀取HDFS上文件,如果部署的on yarn模式的Spark,則啟動(dòng)spark-shell默認(rèn)讀取HDFS上對(duì)應(yīng)的: hdfs:xxxx/user/xx/ 下的文件;
scala> import org.apache.spark.streaming._ scala> val ssc = new StreamingContext(sc, Seconds(5)) scala> val lines = ssc.textFileStream("hdfs://xxx/yzg_test.txt") scala> val Counts = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _) scala> Counts.saveAsTextFiles("hdfs://xxx/bendi")) scala> ssc.start() scala> ssc.awaitTermination() scala> ssc.stop()
以上代碼在spark-shell中運(yùn)行后,每隔5秒讀取hdfs上的文件并進(jìn)行詞頻統(tǒng)計(jì)后寫(xiě)入到hdfs中的“bendi-時(shí)間戳”文件夾下,直到ssc.stop();Counts.saveAsTextFiles("file://xxx/bendi"))和Counts.print分別寫(xiě)本地和std輸出;
Socket套接字流
Spark Streaming可以通過(guò)Socket端口實(shí)時(shí)監(jiān)聽(tīng)并接收數(shù)據(jù)計(jì)算,步驟如下:
driver端創(chuàng)建StreamingContext對(duì)象,啟動(dòng)上下文時(shí)依次創(chuàng)建JobScheduler和ReceiverTracker,并調(diào)用他們的start方法。ReceiverTracker在start方法中發(fā)送啟動(dòng)接收器消息給遠(yuǎn)程Executor,消息內(nèi)部含有ServerSocket的地址信息。在executor一側(cè),由Receiver TrackerEndpoint終端接受消息,抽取消息內(nèi)容,利用sparkContext結(jié)合消息內(nèi)容創(chuàng)建ReceiverRDD對(duì)象,最后提交rdd給spark集群。在代碼實(shí)現(xiàn)上,使用nc –lk 9999 開(kāi)啟 地址172.22.241.184主機(jī)的9999監(jiān)聽(tīng)端口,并持續(xù)往里面寫(xiě)數(shù)據(jù);使用spark-shell實(shí)現(xiàn)監(jiān)聽(tīng)端口代碼如下,輸入源為socket源,進(jìn)行簡(jiǎn)單的詞頻統(tǒng)計(jì)后,統(tǒng)計(jì)結(jié)果輸出到HDFS文件系統(tǒng);
scala> import org.apache.spark._ scala> import org.apache.spark.streaming._ scala> import org.apache.spark.storage.StorageLevel scala> val ssc = new StreamingContext(sc, Seconds(5)) scala> val lines = ssc.socketTextStream("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER) scala> val wordCounts = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _) scala> wordCounts.saveAsTextFiles("hdfs://xxx/bendi-socket")) scala> ssc.start() scala> ssc.awaitTermination() scala> ssc.stop()
Kafka流(窗口)
Kafka和Flume等高級(jí)輸入源需要依賴獨(dú)立的庫(kù)(jar文件),如果使用spark-shell讀取kafka等高級(jí)輸入源,需要將對(duì)應(yīng)的依賴jar包放在spark的依賴文件夾lib下。
根據(jù)當(dāng)前使用的kafka版本,適配所需要的spark-streaming-kafka依賴的版本,在maven倉(cāng)庫(kù)下載,地址如下:https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.2.1
將對(duì)應(yīng)的依賴jar包放在CDH的spark的依賴文件夾lib下,通過(guò)引入包內(nèi)依賴驗(yàn)證是否成功:
scala> import org.apache.spark._ scala> import org.apache.spark.streaming._ scala> import org.apache.spark.streaming.kafka._ scala> val ssc = new StreamingContext(sc, Seconds(5)) scala> ssc.checkpoint("hdfs://usr/spark/kafka/checkpoint") scala> val zkQuorum = "172.22.241.186:2181" scala> val group = "test-consumer-group" scala> val topics = "yzg_spark" scala> val numThreads = 1 scala> val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap scala> val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) scala> val pair = lineMap.map(_._2).flatMap(_.split(" ")).map((_,1)) scala> val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ -_,Minutes(2),Seconds(10),2) scala> wordCounts.print scala> ssc.start
updateStateByKey操作
當(dāng)Spark Streaming需要跨批次間維護(hù)狀態(tài)時(shí),就必須使用updateStateByKey操作。以詞頻統(tǒng)計(jì)為例,對(duì)于有狀態(tài)轉(zhuǎn)換操作而言,當(dāng)前批次的詞頻統(tǒng)計(jì)是在之前批次的詞頻統(tǒng)計(jì)結(jié)果的基礎(chǔ)上進(jìn)行不斷累加,所以最終統(tǒng)計(jì)得到的詞頻是所有批次的單詞總的詞頻統(tǒng)計(jì)結(jié)果。
val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) }
實(shí)現(xiàn):
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel val ssc = new StreamingContext(sc, Seconds(5)) ssc.checkpoint("hdfs:172.22.241.184:8020//usr/spark/checkpoint") val lines = ssc.socketTextStream("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER) val wordCounts = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey[Int](updateFunc) wordCounts.saveAsTextFiles("hdfs:172.22.241.184:8020//user/spark/bendi-socket") ssc.start() ssc.awaitTermination() ssc.stop()
Streaming同Kafka交互
Dstream創(chuàng)建
關(guān)于SparkStreaming實(shí)時(shí)計(jì)算框架實(shí)時(shí)地讀取kafka中的數(shù)據(jù)然后進(jìn)行計(jì)算,在spark1.3版本后kafkaUtils提供兩種Dstream創(chuàng)建方法,一種為KafkaUtils.createDstream,另一種為KafkaUtils.createDirectStream。
KafkaUtils.createDstream方式
其構(gòu)造函數(shù)為KafkaUtils.createDstream(ssc,[zk], [consumer group id], [per-topic,partitions] ),使用receivers來(lái)接收數(shù)據(jù),利用的是Kafka高層次的消費(fèi)者api,對(duì)于所有的receivers接收到的數(shù)據(jù)將會(huì)保存在Spark executors中,然后通過(guò)Spark Streaming啟動(dòng)job來(lái)處理這些數(shù)據(jù),默認(rèn)會(huì)丟失,可啟用WAL日志,它同步將接受到數(shù)據(jù)保存到分布式文件系統(tǒng)上比如HDFS。所以數(shù)據(jù)在出錯(cuò)的情況下可以恢復(fù)出來(lái)。
A、創(chuàng)建一個(gè)receiver來(lái)對(duì)kafka進(jìn)行定時(shí)拉取數(shù)據(jù),ssc的RDD分區(qū)和Kafka的topic分區(qū)不是一個(gè)概念,故如果增加特定主消費(fèi)的線程數(shù)僅僅是增加一個(gè)receiver中消費(fèi)topic的線程數(shù),并不增加spark的并行處理數(shù)據(jù)數(shù)量。
B、對(duì)于不同的group和topic可以使用多個(gè)receivers創(chuàng)建不同的DStream
C、如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)
同時(shí)需要設(shè)置存儲(chǔ)級(jí)別(默認(rèn)StorageLevel.MEMORY_AND_DISK_SER_2),即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
KafkaUtils.createDirectStream方式
在spark1.3之后,引入了Direct方式,不同于Receiver的方式,Direct方式?jīng)]有receiver這一層,其會(huì)周期性的獲取Kafka中每個(gè)topic的每個(gè)partition中的最新offsets,之后根據(jù)設(shè)定的maxRatePerPartition來(lái)處理每個(gè)batch。如圖:
這種方法相較于Receiver方式的優(yōu)勢(shì)在于:
簡(jiǎn)化的并行:在Receiver的方式中我們提到創(chuàng)建多個(gè)Receiver之后利用union來(lái)合并成一個(gè)Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對(duì)應(yīng)的并行讀取Kafka數(shù)據(jù),這種映射關(guān)系也更利于理解和優(yōu)化。
高效:在Receiver的方式中,為了達(dá)到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費(fèi)!而第二種方式不存在這個(gè)問(wèn)題,只要我們Kafka的數(shù)據(jù)保留時(shí)間足夠長(zhǎng),我們都能夠從Kafka進(jìn)行數(shù)據(jù)恢復(fù)。
精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式,但由于Spark Streaming消費(fèi)的數(shù)據(jù)和Zookeeper中記錄的offset不同步,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)。而第二種方式,直接使用了簡(jiǎn)單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進(jìn)行記錄,消除了這種不一致性。
此方法缺點(diǎn)是它不會(huì)更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka監(jiān)視工具將不會(huì)顯示進(jìn)度。但是您可以在每個(gè)批處理中訪問(wèn)此方法處理的偏移量,并自行更新Zookeeper。
位置策略
參照官方的API文檔地址:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html,位置策略是用來(lái)控制特定的主題分區(qū)在哪個(gè)執(zhí)行器上消費(fèi)的,在executor針對(duì)主題分區(qū)如何對(duì)消費(fèi)者進(jìn)行調(diào)度,并且位置的選擇是相對(duì)的,位置策略有三種方案:
1、PreferBrokers:首選kafka服務(wù)器,只有在kafka服務(wù)器和executor位于同一主機(jī)可以使用該策略。
2、PreferConsistent:首選一致性,多數(shù)時(shí)候采用該方式,在所有可用的執(zhí)行器上均勻分配kakfa的主題的所有分區(qū),能夠綜合利用集群的計(jì)算資源。
3、PreferFixed:首選固定模式,如果負(fù)載不均衡可以使用該策略放置在特定節(jié)點(diǎn)使用指定的主題分區(qū);該配置是手動(dòng)控制方案,若沒(méi)有顯式指定的分區(qū)仍然采用(2)方案。
消費(fèi)策略
消費(fèi)者策略是控制如何創(chuàng)建和配制消費(fèi)者對(duì)象或者如何對(duì)Kafka上的消息進(jìn)行消費(fèi)界定,比如t1主題的分區(qū)0和1,或者消費(fèi)特定分區(qū)上的特定消息段。該類(lèi)可擴(kuò)展,自行實(shí)現(xiàn)。
1、ConsumerStrategies.Assign:指定固定的分區(qū)集合;
def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long])
2、ConsumerStrategies.Subscribe:允許消費(fèi)訂閱固定的主題集合;
3、ConsumerStrategies.SubscribePattern:使用正則表達(dá)式指定感興趣的主題集合。
Spark Streaming開(kāi)發(fā)
IDEA作為常用的開(kāi)發(fā)工具使用maven進(jìn)行依賴包的統(tǒng)一管理,配置Scala的開(kāi)發(fā)環(huán)境,進(jìn)行Spark Streaming的API開(kāi)發(fā);
下載并破解IDEA,并加入漢化的包到lib,重啟生效;
在IDEA中導(dǎo)入離線的Scala插件:需要確保當(dāng)前win主機(jī)上已經(jīng)下載安裝Scala并設(shè)置環(huán)境變量,首先下載IDEA的Scala插件,無(wú)須解壓,然后將其添加到IDEA中,具體為new---setting--plugins--"輸入scala"--install plugin from disk;
Maven快捷鍵
shift鍵多次------查找類(lèi)和插件; shift+ctrl+enter-------結(jié)束當(dāng)前行,自動(dòng)補(bǔ)全分號(hào); shift+alter+s-----------setting設(shè)置 alter+enter-----------補(bǔ)全拋出的異常 alter+insert---------自動(dòng)生成get、set、構(gòu)造函數(shù)等; Ctrl+X --------------刪除當(dāng)前行 ctrl+r----------------替換 ctrl+/----------------多行代碼分行注釋?zhuān)啃幸粋€(gè)注釋符號(hào) ctrl+shift+/---------多行代碼注釋在一個(gè)塊里,只在開(kāi)頭和結(jié)尾有注釋符號(hào)
任務(wù)提交
新建maven工程:file--new--project--maven(選擇quickstart框架模型新建),groupId和ArtifactID用來(lái)區(qū)分該java工程;
maven自動(dòng)生成pom.xml配置文件,用于各種包的依賴和引入,如果使用maven打包,需要引入maven的打包插件:使用maven-compiler-plugin、maven-jar-plugin插件,并在prom.xml中增加指定程序入口的配置;具體可參照:https://blog.csdn.net/qq_17348297/article/details/79092383
將mainClass設(shè)置為HelloWorld(主類(lèi)),點(diǎn)擊右邊窗口maven -> package,生成jar包,打包完成后使用spark-submit指令提交jar包并執(zhí)行。
spark-submit --class "JSONRead" /usr/local/spark/mycode/json/target/scala-2.11/json-project_2.11-1.0.jar
若有cannot find main class錯(cuò)誤,需要?jiǎng)h除-class xx.jar選項(xiàng);若出現(xiàn)“Invalid signature file digest for Manifest main”錯(cuò)誤,則運(yùn)行zip -d xxx.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF' 指令,刪除所屬jar包中.SF/.RSA/相關(guān)文件。任務(wù)yarn管理器查看任務(wù)運(yùn)行情況;
Structured Streaming
在Spark2.x中,spark新開(kāi)放了一個(gè)基于DataFrame的無(wú)下限的流式處理組件Structured Streaming,在過(guò)去使用streaming時(shí)一次處理是當(dāng)前batch的所有數(shù)據(jù),針對(duì)這波數(shù)據(jù)進(jìn)行各種處理,如果要做一些類(lèi)似pv,uv的統(tǒng)計(jì),需要借助有狀態(tài)的state的DStream,或者借助一些分布式緩存系統(tǒng),如Redis,做一些類(lèi)似Group by的操作Streaming是非常不便的,在面對(duì)復(fù)雜的流式處理場(chǎng)景時(shí)捉襟見(jiàn)肘,且無(wú)法支持基于event_time的時(shí)間窗口做聚合邏輯。
在Structured Streaming中,把源源不斷到來(lái)的數(shù)據(jù)通過(guò)固定的模式“追加”或者“更新”到無(wú)下限的DataFrame中。剩余的工作跟普通的DataFrame一樣,可以去map、filter,也可以去groupby().count(),甚至還可以把流處理的dataframe跟其他的“靜態(tài)”DataFrame進(jìn)行join。另外,還提供了基于window時(shí)間的流式處理??傊琒tructured Streaming提供了快速、可擴(kuò)展、高可用、高可靠的流式處理。
Structured Streaming構(gòu)建于sparksql引擎之上,可以用處理靜態(tài)數(shù)據(jù)的方式去處理你的流計(jì)算,隨著流數(shù)據(jù)的不斷流入,Sparksql引擎會(huì)增量的連續(xù)不斷的處理并且更新結(jié)果。可以使用DataSet/DataFrame的API進(jìn)行 streaming aggregations, event-time windows, stream-to-batch joins等,計(jì)算的執(zhí)行也是基于優(yōu)化后的sparksql引擎。通過(guò)checkpointing and Write Ahead Logs該系統(tǒng)可以保證點(diǎn)對(duì)點(diǎn),一次處理,容錯(cuò)擔(dān)保。
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!