国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

廣州網(wǎng)站建設(shè)開發(fā)公司指數(shù)基金有哪些

廣州網(wǎng)站建設(shè)開發(fā)公司,指數(shù)基金有哪些,做貿(mào)易的網(wǎng)站有哪些,上海網(wǎng)站建設(shè)多少費(fèi)用背景 在Flink中有兩種基本的狀態(tài):Keyed State和Operator State,Operator State很好理解,一個(gè)特定的Operator算子共享同一個(gè)state,這是實(shí)現(xiàn)層面很好做到的。 但是 Keyed State 是怎么實(shí)現(xiàn)的?一般來說,正常的…

背景

在Flink中有兩種基本的狀態(tài):Keyed State和Operator StateOperator State很好理解,一個(gè)特定的Operator算子共享同一個(gè)state,這是實(shí)現(xiàn)層面很好做到的。
但是 Keyed State 是怎么實(shí)現(xiàn)的?一般來說,正常的人第一眼就會(huì)想到:一個(gè)task綁定一個(gè)Keyd State,從網(wǎng)上隨便查找資料就能發(fā)現(xiàn)正確的答案是:對(duì)于每一個(gè)Key會(huì)綁定一個(gè)State,但是這在Flink中是怎么實(shí)現(xiàn)的呢?
注意:這里我們只講Flink中是怎么實(shí)現(xiàn)一個(gè)Key對(duì)應(yīng)一個(gè)State的,其他細(xì)節(jié)并不細(xì)說,且state的backend為RocksDB

閑說雜談

我們以ValueState類型的Keyed State舉例:


ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =new ValueStateDescriptor<>("indexState",TypeInformation.of(HoodieRecordGlobalLocation.class));
ValueState<HoodieRecordGlobalLocation> indexState = context.getKeyedStateStore().getState(indexStateDesc)
....
indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation())
  • context.getKeyedStateStore().getState是獲取對(duì)應(yīng)keyState,最終的調(diào)用鏈如下:

     DefaultKeyedStateStore.getState -> getPartitionedState||\/RocksDBKeyedStateBackend.getPartitionedState -> getOrCreateKeyedState -> createInternalState -> tryRegisterKvStateInformation||\/RocksDBValueState.create(創(chuàng)建RocksDBValueState)                                                                             

    這里的 tryRegisterKvStateInformation會(huì)涉及到RocksDB ColumnFamily的創(chuàng)建:

    RocksDBOperationUtils.createStateInfo -> createColumnFamilyDescriptor 
    // createColumnFamilyDescriptor的部分代碼:
    ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
    if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);
    }
    byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    ...
    return new ColumnFamilyDescriptor(nameBytes, options);

    其實(shí)最終會(huì)發(fā)現(xiàn)RocksDBColumnFamily是跟ValueStateDescriptor也就是描述符的名字有關(guān)的,這就是為什么描述符必須是唯一的,關(guān)于RocksDBColumnFamily,可以參考RocksDB 簡介
    注意此時(shí)返回是key對(duì)應(yīng)的一個(gè)State的ColumnFamily,該Family包括該task所有的key的value值

  • indexState.update 這里是更新indexState得值
    因?yàn)樯弦徊降玫街皇窃揟ask所對(duì)應(yīng)的ColumanFamily所對(duì)應(yīng)的所有的values,也就是* Flink中的Key-Groups*,(關(guān)于Key-Groups可以參考Apache-Flink深度解析-State)

      public void update(V value) {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (Exception e) {throw new FlinkRuntimeException("Error while adding data to RocksDB", e);}}
    

    最終的調(diào)用鏈如下:

    RocksDBValueState.update -> serializeCurrentKeyWithGroupAndNamespace||\/
    SerializedCompositeKeyBuilder.buildCompositeKeyNamespace||\/
    serializeNamespace(namespace, namespaceSerializer) -> keyOutView.getCopyOfBuffer()   

    這里的keyOutView.getCopyOfBuffer是會(huì)獲得的record的key,所以在backend.db.put方法中才會(huì)更新對(duì)應(yīng)的Key值。
    但是什么時(shí)候Record的key信息會(huì)被寫入到keyOutView中去呢?

  • Record的key何時(shí)被寫到keyOutView

    AbstractStreamTaskNetworkInput.emitNext -> processElement||\/
    OneInputStreamTask.emitRecord||\/
    OneInputStreamOperator.setKeyContextElement -> setKeyContextElement1 -> setKeyContextElement||\/
    AbstractStreamOperator.setCurrentKey||\/
    StreamOperatorStateHandler.setCurrentKey||\/
    RocksDBKeyedStateBackend.setCurrentKey||\/
    SerializedCompositeKeyBuilder.setCurrentKey -> serializeKeyGroupAndKey||\/
    keySerializer.serialize(key, keyOutView);    

    最后一步keySerializer.serialize(key, keyOutView)一個(gè)Record的key就被寫到keyOutView中,也就是說對(duì)應(yīng)的key是從每個(gè)record中獲取的,所以在backend.db.put方法中就能獲取到對(duì)應(yīng)的Key

其他

對(duì)于keyedStateStore是在哪里初始化的,可以看AbstractStreamOperatorinitializeState方法:

final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);

這個(gè)方法里也包括了keyedStatedBackendoperatorStateBackend等初始化, 具體的細(xì)節(jié)后續(xù)再解析。

http://www.aloenet.com.cn/news/32026.html

相關(guān)文章:

  • 廈門企業(yè)網(wǎng)站開發(fā)公司2024年新冠第三波癥狀分析
  • 網(wǎng)頁設(shè)計(jì) 效果圖亞馬遜seo推廣
  • c語言做網(wǎng)站后臺(tái)服務(wù)百度關(guān)鍵詞排名銷售
  • 學(xué)習(xí)網(wǎng)站開發(fā)寧德市房價(jià)
  • 域名備案成功怎么做網(wǎng)站免費(fèi)的網(wǎng)絡(luò)推廣渠道
  • 無錫做網(wǎng)站優(yōu)化價(jià)格301313龍虎榜
  • 做網(wǎng)站組織架構(gòu)西安高端模板建站
  • 做網(wǎng)站端口無法清除十大經(jīng)典事件營銷案例分析
  • 頂尖網(wǎng)站設(shè)計(jì)東莞百度快照優(yōu)化排名
  • 小魚賺錢網(wǎng)站能重復(fù)做任務(wù)嗎電商網(wǎng)站對(duì)比表格
  • 大良營銷網(wǎng)站建設(shè)價(jià)位在線看crm系統(tǒng)
  • 房山成都網(wǎng)站建設(shè)肇慶seo按天收費(fèi)
  • 西安國內(nèi)做網(wǎng)站的公司有哪些排行榜前十名
  • html5flash設(shè)計(jì)開發(fā)|交互設(shè)計(jì)|網(wǎng)站建設(shè) 青島樂天seo培訓(xùn)中心
  • 網(wǎng)絡(luò)營銷策略內(nèi)容廈門seo俱樂部
  • 做網(wǎng)站需要什么技術(shù)搜索關(guān)鍵詞站長工具
  • 高端網(wǎng)站哪個(gè)比較好線上產(chǎn)品推廣方案
  • 針對(duì)人群不同 網(wǎng)站做細(xì)分全球疫情最新數(shù)據(jù)
  • 免費(fèi)空間申請(qǐng)網(wǎng)站網(wǎng)絡(luò)營銷在哪里學(xué)比較靠譜
  • 網(wǎng)站備案流程圖上海自動(dòng)seo
  • 2015年做哪個(gè)網(wǎng)站能致富建網(wǎng)站用什么工具
  • 旅游搜索網(wǎng)站開發(fā)百度網(wǎng)站推廣怎么做
  • 鎮(zhèn)江做網(wǎng)站哪家公司好靠網(wǎng)絡(luò)營銷火起來的企業(yè)
  • 方城網(wǎng)站制作網(wǎng)絡(luò)營銷專業(yè)課程
  • 重慶的做網(wǎng)站公司百度風(fēng)云榜小說榜排名
  • 專業(yè)做網(wǎng)站價(jià)格廈門百度關(guān)鍵詞優(yōu)化
  • 網(wǎng)站推廣的技巧和方法企業(yè)網(wǎng)站的網(wǎng)絡(luò)營銷功能
  • 在什么網(wǎng)站做貿(mào)易好最簡短的培訓(xùn)心得
  • 江蘇建設(shè)廳官方網(wǎng)站安全員北京專門做seo
  • 佛山市網(wǎng)站建設(shè)保定網(wǎng)站建設(shè)方案優(yōu)化