更新時間:2021-01-15 來源:黑馬程序員 瀏覽量:
問題分析
假如我們自己寫一個流式框架。我們該如何處理消息。正常情況下,我們看到消息按照順序一個個發(fā)送,接受后按照順序處理,這是沒有什么問題的。然而也要考慮到一些特殊情況下,消息不在是按照順序發(fā)送,產(chǎn)生了亂序,這時候該怎么處理?
核心問題講解
(1)watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。但是對于late element,我們又不能無限期的等下去,必須要有個機(jī)制來保證一個特定的時間后,必須觸發(fā)window去進(jìn)行計算。這個特別的機(jī)制,就是watermark。
(2)通常,在接收到source的數(shù)據(jù)后,應(yīng)該立刻生成watermark;但是,也可以在接收source后,應(yīng)用簡單的map或者filter操作,然后再生成watermark。
(3)如果延遲的數(shù)據(jù)有業(yè)務(wù)需要,則設(shè)置好允許延遲的時間,因?yàn)槲覀儾荒軣o限期的等下去。每個窗口都有屬于自己的最大等待延遲數(shù)據(jù)的時間限制,窗口結(jié)束時間+延遲時間=最大waterMark值,即當(dāng)waterMark值大于的上述計算出的最大waterMark值,該窗口內(nèi)的數(shù)據(jù)就屬于遲到的數(shù)據(jù),無法參與window計算。
問題擴(kuò)展
Window:Window是處理無界流的關(guān)鍵,Windows將流拆分為一個個有限大小的buckets,可以在每一個buckets中進(jìn)行計算。start_time,end_time:當(dāng)Window時時間窗口的時候,每個window都會有一個開始時間和結(jié)束時間(前開后閉),這個時間是系統(tǒng)時間。event-time: 事件發(fā)生時間,是事件發(fā)生所在設(shè)備的當(dāng)?shù)貢r間,比如一個點(diǎn)擊事件的時間發(fā)生時間,是用戶點(diǎn)擊操作所在的手機(jī)或電腦的時間。Watermarks:可以把他理解為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發(fā)此window的計算,Watermarks就是用來觸發(fā)window計算的。結(jié)合項(xiàng)目中使用
watermark如何處理亂序數(shù)據(jù)?
假如我們設(shè)置10s的時間窗口(window),那么0~10s,10~20s都是一個窗口,以0~10s為例,0位start-time,10為end-time。假如有4個數(shù)據(jù)的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設(shè)置Watermarks為當(dāng)前所有到達(dá)數(shù)據(jù)event-time的最大值減去延遲值3.5秒。
當(dāng)A到達(dá)的時候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發(fā)計算;
當(dāng)B到達(dá)的時候,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會觸發(fā)計算;
當(dāng)C到達(dá)的時候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發(fā)計算;
當(dāng)D到達(dá)的時候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發(fā)計算;
觸發(fā)計算的時候,會將AC(因?yàn)樗麄兌夹∮?0)都計算進(jìn)去。
通過上面這種方式,我們就將遲到的C計算進(jìn)去了。這里的延遲3.5s是我們假設(shè)一個數(shù)據(jù)到達(dá)的時候,比他早3.5s的數(shù)據(jù)肯定也都到達(dá)了,這個是需要根據(jù)經(jīng)驗(yàn)推算的,加入D到達(dá)以后有到達(dá)了一個E,event-time=6,但是由于0~10的時間窗口已經(jīng)開始計算了,所以E就丟了。
猜你喜歡: