優(yōu)步的任務(wù)是提供“對(duì)每個(gè)人來(lái)說(shuō),在任何地方都可以獲得像自來(lái)水一樣可靠的出行服務(wù)”。為了履行這一承諾,優(yōu)步依賴于在每個(gè)層面做出數(shù)據(jù)驅(qū)動(dòng)的決策。大部分的決策都得益于更快的數(shù)據(jù)處理。例如,使用數(shù)據(jù)來(lái)理解一個(gè)地區(qū)以便于增加業(yè)務(wù),或城市運(yùn)營(yíng)團(tuán)隊(duì)對(duì)新數(shù)據(jù)的訪問(wèn)來(lái)運(yùn)營(yíng)每個(gè)城市。不用說(shuō),數(shù)據(jù)處理系統(tǒng)的選擇和必要的服務(wù)水平協(xié)議是數(shù)據(jù)團(tuán)隊(duì)與優(yōu)步用戶之間日常交互的主題。 在本文中,我想基于在優(yōu)步建立數(shù)據(jù)基礎(chǔ)設(shè)施的經(jīng)驗(yàn)和經(jīng)歷,討論準(zhǔn)實(shí)時(shí)案例中數(shù)據(jù)處理系統(tǒng)的選擇。在本文中,我認(rèn)為通過(guò)增加新的增量處理原語(yǔ)到現(xiàn)有的Hadoop技術(shù)中,將能以更少的開(kāi)銷和統(tǒng)一的方式解決很多問(wèn)題。在優(yōu)步,我們正在構(gòu)建相應(yīng)的系統(tǒng)來(lái)解決這里總結(jié)的問(wèn)題,并且以開(kāi)放的態(tài)度,希望與對(duì)這一領(lǐng)域有興趣的、志同道合的組織進(jìn)行合作。 準(zhǔn)實(shí)時(shí)案例 首先,讓我們定義這類案例。在一些場(chǎng)景里,長(zhǎng)達(dá)一小時(shí)的延遲是可容忍的,他們?cè)诖蟛糠智闆r下都可以通過(guò)在MapReduce/Spark上用傳統(tǒng)的批處理來(lái)執(zhí)行,同時(shí)數(shù)據(jù)一般會(huì)向Hadoop/S3上增量添加。與之相反的另一個(gè)極端的案例是:需要小于一到兩秒的延遲,通常涉及到將你的數(shù)據(jù)輸送到一個(gè)可擴(kuò)展的鍵值存儲(chǔ)(已經(jīng)在這個(gè)上工作)并且進(jìn)行查詢。諸如Storm、Spark流處理以及Flink之類的流式處理系統(tǒng)已經(jīng)相當(dāng)好地構(gòu)建了實(shí)際可以支持約一到五分鐘延遲的應(yīng)用。流式系統(tǒng)對(duì)于像欺詐檢測(cè)、異常檢測(cè)或者系統(tǒng)監(jiān)控這些需要機(jī)器快速響應(yīng)做出的決策或者以盯著計(jì)算機(jī)屏幕為日常工作的人來(lái)說(shuō)是非常有用的。 這兩個(gè)極端之間給我們留下了一個(gè)大鴻溝,即從五分鐘到一小時(shí)的端到端的處理延遲。在本文里我將這種情況稱為準(zhǔn)實(shí)時(shí)。大部分準(zhǔn)實(shí)時(shí)案例商業(yè)儀表板,或是一些輔助人工決策的應(yīng)用。下面是一些準(zhǔn)實(shí)時(shí)可能的應(yīng)用場(chǎng)景: 1.觀察過(guò)去X分鐘內(nèi)儀表板上是否有任何異常; 2.測(cè)量過(guò)去X分鐘內(nèi)在網(wǎng)站上執(zhí)行的試驗(yàn)進(jìn)行的如何; 3.商業(yè)指標(biāo)在X分鐘間隔內(nèi)的更新; 4.對(duì)一個(gè)機(jī)器學(xué)習(xí)管道在過(guò)去X分鐘內(nèi)進(jìn)行特征抽。 圖一:處理延遲的不同著色圖以及相關(guān)的典型技術(shù)。由Vinoth Chandar提供 通過(guò)“迷你”批次進(jìn)行增量處理 解決準(zhǔn)實(shí)時(shí)案例的選擇是相當(dāng)開(kāi)放的。流式處理能夠提供低延遲,并有較為基本的SQL支持能力,但是需要預(yù)先定義查詢來(lái)達(dá)到較好的效果。專有的數(shù)據(jù)倉(cāng)庫(kù)有許多特性(例如,事務(wù)、索引),并且能支持隨機(jī)和預(yù)定義的查詢,但是這種專有數(shù)據(jù)倉(cāng)庫(kù)在規(guī)模上有限制而且價(jià)格昂貴。批處理可以解決大規(guī)模數(shù)據(jù)的場(chǎng)景,并通過(guò)Spark SQL/Hive提供成熟的SQL支持。但是這種處理的方式通常會(huì)有比較高的延遲。由于各有利弊,最后用戶通;诳捎玫挠布退麄兘M織內(nèi)部的運(yùn)維支持的方式來(lái)做出選擇。我們將在本文的結(jié)論處在回頭來(lái)看這些挑戰(zhàn)。 下面我會(huì)介紹通過(guò)使用Spark/MapReduce而不是運(yùn)行流式處理任務(wù),以每X分鐘執(zhí)行迷你批任務(wù)的方式來(lái)解決準(zhǔn)實(shí)時(shí)場(chǎng)景的一些技術(shù)優(yōu)點(diǎn)。類似于Spark流處理中的微批次(以秒粒度執(zhí)行操作),迷你批次以分鐘粒度來(lái)運(yùn)行。在本文中,我將通篇使用“增量處理”這一術(shù)語(yǔ)來(lái)指代這種處理方式。 增加效率 增量的處理迷你批次中的新數(shù)據(jù)能更加有效地使用組織中的資源。讓我們來(lái)舉個(gè)具體的例子,我們有一個(gè)Kafka事件流以每秒一萬(wàn)條的速度涌入,我們想要計(jì)算過(guò)去15分鐘在一些維度上的消息的數(shù)量。大部分流式處理管道使用一個(gè)外部結(jié)果存儲(chǔ)系統(tǒng)(例如Cassandra, ElasticSearch)來(lái)保存聚合的計(jì)數(shù),并讓在YARN/Mesos等資源管理里的容器持續(xù)運(yùn)行。這在小于五分鐘的延遲窗口的場(chǎng)景下是說(shuō)得通的。實(shí)際上,典型的YARN容器的啟動(dòng)開(kāi)銷大約是一分鐘。此外,為了提升寫(xiě)操作到結(jié)果存儲(chǔ)系統(tǒng)上的性能,我們通常進(jìn)行緩存并進(jìn)行批量更新,這種協(xié)議都需要容器持續(xù)地運(yùn)行。 圖二: 流式處理引擎和增量迷你批次任務(wù)處理的對(duì)比。由Vinoth Chandar提供 然而在準(zhǔn)實(shí)時(shí)處理的場(chǎng)景里,這些選擇可能不是最佳的。為了達(dá)到同樣的效果,你可以使用短生命周期的容器并且優(yōu)化整體的資源利用。在圖二中,流式處理器在15分鐘內(nèi)執(zhí)行了六百萬(wàn)次更新到結(jié)果存儲(chǔ)系統(tǒng)上。但是在增量更新模型里,我們執(zhí)行一次內(nèi)存中的合并同時(shí)僅進(jìn)行一次更新到結(jié)果存儲(chǔ)系統(tǒng)中,這時(shí)只會(huì)使用資源容器五分鐘。相比實(shí)時(shí)模式,增量處理模型有三倍的CPU效率提升,在更新到結(jié)果存儲(chǔ)的方面有幾個(gè)數(shù)量級(jí)的效率提升;旧希@種處理方式按需獲取資源,喚醒的間隔足以完成等待的任務(wù),而不用長(zhǎng)時(shí)間運(yùn)行,一邊等待任務(wù),一邊吞食CPU和內(nèi)存。 建立在已有的SQL引擎之上 隨著時(shí)間的推移,大量SQL引擎在Hadoop/big data領(lǐng)域演進(jìn)并發(fā)展(例如,Hive, Presto, SparkSQL)。它們提供了更好的針對(duì)大數(shù)據(jù)的復(fù)雜問(wèn)題的表達(dá)能力。這些系統(tǒng)已經(jīng)被大規(guī)模地部署,并在查詢計(jì)劃、執(zhí)行等方面得到逐步增強(qiáng)。另一方面,流式處理的SQL仍然處于早期階段。通過(guò)使用在Hadoop生態(tài)圈內(nèi)已有的、更加成熟的SQL引擎來(lái)執(zhí)行增量處理,我們可以利用他們自身發(fā)展過(guò)程中形成的堅(jiān)實(shí)基礎(chǔ)。 例如,連接操作在流式處理中是非常棘手的,因?yàn)橐诖翱陂g對(duì)齊流。在增量處理模型中,這一問(wèn)題變得更簡(jiǎn)單,因?yàn)橛兄鄬?duì)更長(zhǎng)的窗口,這使得有更多的空間讓流在處理窗口中對(duì)齊。另一方面,如果正確性更為重要,SQL提供了一個(gè)更加簡(jiǎn)單的方式來(lái)選擇性地?cái)U(kuò)展連接的窗口并且重新處理。 這類SQL引擎的另一個(gè)重要進(jìn)步是對(duì)諸如ORC/Parquet等列式文件格式的支持,這對(duì)于分析工作是有著顯著好處的。例如,連接兩個(gè)有Avro記錄的Kafka主題將比連接兩個(gè)通過(guò)ORC/Parquet文件格式存儲(chǔ)的Hive/Spark的表的開(kāi)銷大得多。這是因?yàn),?duì)于Avro記錄來(lái)說(shuō),你最終要反序列化整個(gè)記錄,而列式文件中只需要讀取在記錄中會(huì)被查詢所用到的列。如果我們簡(jiǎn)單地從一條編碼的Kafka Avro事件中的1000個(gè)字段中投影出10個(gè)字段,我們?nèi)匀恍枰獮樗凶侄位ㄙM(fèi)CPU和I/O的開(kāi)銷。列式文件格式通常可以更為“聰明”地投影到存儲(chǔ)層。 圖三:Kafka事件和HDFS上列式文件,將10個(gè)字段從1000個(gè)字段中投影出來(lái)的CPU和I/O開(kāi)銷的對(duì)比。由Vinoth Chandar提供 較少的運(yùn)動(dòng)部件 現(xiàn)在被廣泛實(shí)現(xiàn)的Lambda架構(gòu)(一個(gè)基于MapReduce 和 Storm 構(gòu)建的流式處理的應(yīng)用架構(gòu))有兩個(gè)模塊:速度層和批處理層。它們通常由兩個(gè)獨(dú)立的實(shí)現(xiàn)(從代碼到基礎(chǔ)設(shè)施)來(lái)管理。例如,Storm是速度層上的一個(gè)熱門(mén)選項(xiàng),而MapReduce可以作為批處理層來(lái)提供服務(wù)。實(shí)際上,人們經(jīng)常依賴速度層來(lái)提供更新的結(jié)果(可能并不準(zhǔn)確),而一旦數(shù)據(jù)被認(rèn)為是完整了之后,通過(guò)批處理層在稍后的時(shí)候里來(lái)糾正速度層的結(jié)果。隨著增量處理的使用,我們有機(jī)會(huì)以統(tǒng)一的方式在代碼層面和基礎(chǔ)設(shè)施層面來(lái)實(shí)現(xiàn)Lambda架構(gòu)。 圖四:結(jié)果表的計(jì)算,背后是一個(gè)經(jīng)由增量處理得到的快速視圖和一個(gè)經(jīng)由批處理得到的更完整的視圖。由Vinoth Chandar提供 上圖中描述的思想相當(dāng)簡(jiǎn)潔。正如我們所說(shuō)的,你可以使用SQL或者類似Spark這樣的批處理框架來(lái)一致地實(shí)現(xiàn)你的處理邏輯。結(jié)果表增量地被建立,像流式處理那樣在“新數(shù)據(jù)”上執(zhí)行SQL來(lái)產(chǎn)生一個(gè)結(jié)果的快速視圖。同樣的SQL可以周期性的被執(zhí)行在全數(shù)據(jù)上,來(lái)糾正任何不準(zhǔn)確的結(jié)果(記住,連接操作總是棘手的。,并產(chǎn)生一個(gè)更加“完整”的結(jié)果的視圖。在這兩種情況下,我們都將使用同樣的Hadoop基礎(chǔ)設(shè)施來(lái)執(zhí)行計(jì)算,這可以降低總體運(yùn)營(yíng)成本和復(fù)雜度。 增量處理的挑戰(zhàn) 在羅列了增量處理架構(gòu)的優(yōu)點(diǎn)之后,讓我們來(lái)討論一下在現(xiàn)在的Hadoop生態(tài)系統(tǒng)中實(shí)現(xiàn)這一架構(gòu)時(shí)會(huì)面臨的挑戰(zhàn)。 完整性和延遲之間的權(quán)衡 在計(jì)算時(shí),隨著我們?cè)诹魇教幚、增量處理和批處理之間變換,我們面臨著相同的根本權(quán)衡。一些應(yīng)用需要所有的數(shù)據(jù),并產(chǎn)生更為完整和準(zhǔn)確的結(jié)果,而一些則只需要低延遲的數(shù)據(jù)來(lái)產(chǎn)生相對(duì)可接受的結(jié)果即可。讓我們來(lái)看幾個(gè)例子。 圖五:展示了不同的Hadoop應(yīng)用對(duì)延遲和數(shù)據(jù)完整性的容忍度。由Vinoth Chandar提供 圖五描繪了一些應(yīng)用案例,根據(jù)它們對(duì)延遲和(不)完整性的容忍度來(lái)定位。商務(wù)儀表盤(pán)可以展示不同的粒度的各項(xiàng)指標(biāo)。它們通常較為靈活,可展示最近時(shí)間內(nèi)不完整但是有較低延遲的數(shù)據(jù),并隨著時(shí)間變得完整(這也使得它們成為L(zhǎng)ambda架構(gòu)的代表)。對(duì)于數(shù)據(jù)科學(xué)或機(jī)器學(xué)習(xí)的案例而言,從輸入的數(shù)據(jù)中抽取特征的過(guò)程通常延遲較低,而模型用更完整的數(shù)據(jù)進(jìn)行自我訓(xùn)練的延遲較高。其他的例子中,欺詐檢測(cè)要求低延遲地處理可獲取的最新數(shù)據(jù)。而實(shí)驗(yàn)性平臺(tái)需要相當(dāng)?shù)臄?shù)據(jù)量,并以一個(gè)相對(duì)較低的延遲來(lái)保證實(shí)驗(yàn)結(jié)果比較新。 最常見(jiàn)的導(dǎo)致不完整的原因是遲到的數(shù)據(jù)(正如在這篇谷歌云數(shù)據(jù)流的演示文稿中詳細(xì)解釋的)。在真實(shí)的環(huán)境中,遲到的數(shù)據(jù)可以是基礎(chǔ)設(shè)施層存在問(wèn)題,例如數(shù)據(jù)中心的連接斷開(kāi)了15分鐘;或是用戶層面的問(wèn)題,例如移動(dòng)應(yīng)用由于在飛行中不良的連接質(zhì)量而導(dǎo)致事件的延遲發(fā)送。在優(yōu)步,我們面臨著十分相似的挑戰(zhàn),正如我們今年早些時(shí)候在Strata + Hadoop World大會(huì)上所闡述的。 為了有效地支持如此多樣的應(yīng)用集合,編程模型需要以一等公民的方式來(lái)對(duì)待遲到的數(shù)據(jù)。然而,Hadoop的處理通常是基于在完整數(shù)據(jù)(例如Hive中的分區(qū))上的批處理,有保證完整性的職責(zé),也要完全依賴數(shù)據(jù)產(chǎn)生者。在如今復(fù)雜的數(shù)據(jù)生態(tài)系統(tǒng)里,這對(duì)于單個(gè)數(shù)據(jù)產(chǎn)生者來(lái)說(shuō)職責(zé)簡(jiǎn)直太多了。大部分產(chǎn)生者最終通過(guò)在一個(gè)諸如Kafka這樣的存儲(chǔ)系統(tǒng)上使用流式處理來(lái)達(dá)到較低的延遲,而依賴Hadoop存儲(chǔ)來(lái)達(dá)到更加“完整”的(重)處理。我們將在下一節(jié)對(duì)此展開(kāi)來(lái)講。 缺乏用于增量處理的原語(yǔ) 正如在這篇關(guān)于流式處理的文章中詳細(xì)描述的,事件時(shí)間以及其相對(duì)的到達(dá)時(shí)間的定義和遲到數(shù)據(jù)的處理是低延遲計(jì)算中很重要的方面。遲到的數(shù)據(jù)要求重新計(jì)算時(shí)間窗口(通常就是Hadoop中的Hive分區(qū)),盡管這些時(shí)間窗口的結(jié)果可能已經(jīng)被計(jì)算完成甚至是已經(jīng)與終端用戶進(jìn)行過(guò)了交互。通常來(lái)說(shuō),在流式處理世界中這類重新計(jì)算是通過(guò)使用可擴(kuò)展的鍵值存儲(chǔ),在記錄/事件層面增量發(fā)生的,并針對(duì)點(diǎn)查詢和更新進(jìn)行優(yōu)化。然而,在Hadoop中,重新計(jì)算通常意味著重寫(xiě)整個(gè)(不可變)的Hive分區(qū)(或者簡(jiǎn)而言之是一個(gè)HDFS中的文件夾),并且重新計(jì)算所有在那個(gè)Hive分區(qū)上已經(jīng)被消費(fèi)過(guò)的任務(wù)。 從延遲和資源利用角度來(lái)看,這些操作都是開(kāi)銷昂貴的。這一開(kāi)銷通常會(huì)級(jí)聯(lián)地傳導(dǎo)到整個(gè)Hadoop的數(shù)據(jù)流中,最終導(dǎo)致延遲增加了數(shù)小時(shí)。因此,增量處理需要使得這兩種操作更加得快速,從而使我們可以有效地將改變包含到已有的Hive分區(qū)中,并且為下游的表數(shù)據(jù)消費(fèi)者提供一個(gè)僅獲取新改變的數(shù)據(jù)的方式。 有效地支持增量處理可以分解為以下幾個(gè)原語(yǔ)操作: 更新插入:從概念上講,重寫(xiě)整個(gè)分區(qū)可以被視作一個(gè)非常低效的更新插入操作,最終會(huì)寫(xiě)入比進(jìn)來(lái)的數(shù)據(jù)多得多的數(shù)據(jù)。因此,對(duì)(批量)更新插入的首要支持成為非常的重要工具。事實(shí)上,像Kudu和Hive事務(wù)等最近的趨勢(shì)的確是朝著這一方向發(fā)展的。谷歌的Mesa(谷歌的數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng))論文也談?wù)摿藥醉?xiàng)技術(shù),可以被應(yīng)用到快速數(shù)據(jù)注入的場(chǎng)景里。 增量消費(fèi):盡管更新插入可以解決快速地向一個(gè)分區(qū)發(fā)布新數(shù)據(jù)的問(wèn)題,下游的數(shù)據(jù)消費(fèi)者并不知道從過(guò)去的哪一個(gè)時(shí)刻開(kāi)始哪些數(shù)據(jù)被改變了。通常,消費(fèi)者通過(guò)掃描整個(gè)分區(qū)/數(shù)據(jù)表并重新計(jì)算所有數(shù)據(jù)來(lái)得知改變的數(shù)據(jù),這需要花費(fèi)相當(dāng)多的時(shí)間和資源。因此,我們也需要一種機(jī)制來(lái)更加高效地獲取從上次分區(qū)被消費(fèi)的時(shí)間點(diǎn)開(kāi)始改變過(guò)的數(shù)據(jù)記錄。有了上面兩種原語(yǔ)操作,你可以通過(guò)更新插入一個(gè)數(shù)據(jù)集,然后從中增量消費(fèi),并建立(也是增量的)另外一個(gè)數(shù)據(jù)集來(lái)支持很多常見(jiàn)的案例。數(shù)據(jù)投影就是最好理解的案例(如圖六所示): 圖六:一個(gè)簡(jiǎn)單的例子,通過(guò)更新插入新的改變到表1(table_1),并通過(guò)增量消費(fèi)建立一個(gè)簡(jiǎn)單的投影表(projected_table)。由Vinoth Chandar提供 借用Spark流式處理的說(shuō)法(如,流-數(shù)據(jù)集連接,流-流連接),我們可以更高效地以較低的延遲來(lái)操作簡(jiǎn)單的投影和流-數(shù)據(jù)集連接。甚至是流-流連接也可以增量計(jì)算,只不過(guò)需要增加一些額外的邏輯來(lái)做窗口對(duì)齊。 圖七:一個(gè)更為復(fù)雜的例子,將一個(gè)事實(shí)表連接到多個(gè)維度表,從而建立一個(gè)連接過(guò)的表。由Vinoth Chandar提供 這個(gè)案例是我們可以節(jié)省硬件花費(fèi)的同時(shí)顯著地降低延遲的不多見(jiàn)的場(chǎng)景之一。 思維模式的轉(zhuǎn)變 最后的挑戰(zhàn)嚴(yán)格來(lái)說(shuō)并不是技術(shù)上的。在選擇技術(shù)以應(yīng)對(duì)不同的場(chǎng)景時(shí),組織生態(tài)扮演著核心角色。在很多組織中,團(tuán)隊(duì)挑選那些在行業(yè)流行的模板化解決方案,并逐步習(xí)慣以特定的方式來(lái)使用這些系統(tǒng)。例如,典型的數(shù)據(jù)倉(cāng)庫(kù)的延遲需求是以小時(shí)計(jì)的。因此,即使底層技術(shù)可以在更低的延遲下解決不少問(wèn)題,但是還是需要花費(fèi)大量的功夫去實(shí)現(xiàn)數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng)的最小化停機(jī)時(shí)間或者避免在維護(hù)過(guò)程中服務(wù)中斷。如果你是在建立滿足更低延遲的服務(wù)水平協(xié)議的系統(tǒng),這些運(yùn)維特點(diǎn)是很重要的。另一方面,能解決低延遲問(wèn)題的團(tuán)隊(duì)也非常擅長(zhǎng)運(yùn)維那些有嚴(yán)格服務(wù)水平協(xié)議要求的系統(tǒng),這就導(dǎo)致組織機(jī)構(gòu)最后總是會(huì)為批處理和流式處理分別創(chuàng)建數(shù)據(jù)貯藏庫(kù)。這就阻礙了在諸如Hadoop的系統(tǒng)上實(shí)現(xiàn)增量處理,從而無(wú)法獲得上述的好處。 這絕不是要嘗試來(lái)泛化組織生態(tài)的挑戰(zhàn)。作為一個(gè)經(jīng)歷了推動(dòng)領(lǐng)英的在線服務(wù),以及推動(dòng)了優(yōu)步數(shù)據(jù)生態(tài)系統(tǒng)的人,這些僅僅是我自己的觀察。 可帶走的經(jīng)驗(yàn) 我想要留給你以下可帶走的經(jīng)驗(yàn)教訓(xùn): 1.對(duì)實(shí)際延遲需求有清晰的定義可以幫你節(jié)省很多錢(qián)。 2.Hadoop可以通過(guò)應(yīng)用支持增量處理的原語(yǔ)來(lái)解決很多問(wèn)題。 3.統(tǒng)一的架構(gòu)(代碼和基礎(chǔ)設(shè)施)是未來(lái)的方向。 在優(yōu)步,我們有非常直接和可測(cè)量的商業(yè)目標(biāo)/動(dòng)機(jī)去解決這些問(wèn)題。我們正在著手構(gòu)建一個(gè)可以解決這些需求的系統(tǒng)。如果你對(duì)項(xiàng)目合作感興趣,請(qǐng)務(wù)必聯(lián)系我們。 Vinoth Chandar Vinoth Chandar目前致力于將Hadoop和Spark帶到優(yōu)步。在領(lǐng)英,Vinoth Chandar曾領(lǐng)導(dǎo)Voldemort項(xiàng)目,也曾效力于甲骨文,參與服務(wù)器的復(fù)制引擎、高性能計(jì)算和流式計(jì)算。 |