廣州網(wǎng)站建設(shè)開發(fā)公司指數(shù)基金有哪些
背景
在Flink中有兩種基本的狀態(tài):Keyed State和Operator State,Operator State很好理解,一個(gè)特定的Operator算子共享同一個(gè)state,這是實(shí)現(xiàn)層面很好做到的。
但是 Keyed State 是怎么實(shí)現(xiàn)的?一般來說,正常的人第一眼就會(huì)想到:一個(gè)task綁定一個(gè)Keyd State,從網(wǎng)上隨便查找資料就能發(fā)現(xiàn)正確的答案是:對(duì)于每一個(gè)Key會(huì)綁定一個(gè)State,但是這在Flink中是怎么實(shí)現(xiàn)的呢?
注意:這里我們只講Flink中是怎么實(shí)現(xiàn)一個(gè)Key對(duì)應(yīng)一個(gè)State的,其他細(xì)節(jié)并不細(xì)說,且state的backend為RocksDB
閑說雜談
我們以ValueState類型的Keyed State舉例:
ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =new ValueStateDescriptor<>("indexState",TypeInformation.of(HoodieRecordGlobalLocation.class));
ValueState<HoodieRecordGlobalLocation> indexState = context.getKeyedStateStore().getState(indexStateDesc)
....
indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation())
-
context.getKeyedStateStore().getState是獲取對(duì)應(yīng)key的State,最終的調(diào)用鏈如下:
DefaultKeyedStateStore.getState -> getPartitionedState||\/RocksDBKeyedStateBackend.getPartitionedState -> getOrCreateKeyedState -> createInternalState -> tryRegisterKvStateInformation||\/RocksDBValueState.create(創(chuàng)建RocksDBValueState)
這里的 tryRegisterKvStateInformation會(huì)涉及到RocksDB ColumnFamily的創(chuàng)建:
RocksDBOperationUtils.createStateInfo -> createColumnFamilyDescriptor // createColumnFamilyDescriptor的部分代碼: ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options); } byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); ... return new ColumnFamilyDescriptor(nameBytes, options);
其實(shí)最終會(huì)發(fā)現(xiàn)RocksDB的ColumnFamily是跟ValueStateDescriptor也就是描述符的名字有關(guān)的,這就是為什么描述符必須是唯一的,關(guān)于RocksDB的ColumnFamily,可以參考RocksDB 簡介
注意此時(shí)返回是key對(duì)應(yīng)的一個(gè)State的ColumnFamily,該Family包括該task所有的key的value值 -
indexState.update 這里是更新indexState得值
因?yàn)樯弦徊降玫街皇窃揟ask所對(duì)應(yīng)的ColumanFamily所對(duì)應(yīng)的所有的values,也就是* Flink中的Key-Groups*,(關(guān)于Key-Groups可以參考Apache-Flink深度解析-State)public void update(V value) {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (Exception e) {throw new FlinkRuntimeException("Error while adding data to RocksDB", e);}}
最終的調(diào)用鏈如下:
RocksDBValueState.update -> serializeCurrentKeyWithGroupAndNamespace||\/ SerializedCompositeKeyBuilder.buildCompositeKeyNamespace||\/ serializeNamespace(namespace, namespaceSerializer) -> keyOutView.getCopyOfBuffer()
這里的keyOutView.getCopyOfBuffer是會(huì)獲得的record的key,所以在backend.db.put方法中才會(huì)更新對(duì)應(yīng)的Key值。
但是什么時(shí)候Record的key信息會(huì)被寫入到keyOutView中去呢? -
Record的key何時(shí)被寫到keyOutView中
AbstractStreamTaskNetworkInput.emitNext -> processElement||\/ OneInputStreamTask.emitRecord||\/ OneInputStreamOperator.setKeyContextElement -> setKeyContextElement1 -> setKeyContextElement||\/ AbstractStreamOperator.setCurrentKey||\/ StreamOperatorStateHandler.setCurrentKey||\/ RocksDBKeyedStateBackend.setCurrentKey||\/ SerializedCompositeKeyBuilder.setCurrentKey -> serializeKeyGroupAndKey||\/ keySerializer.serialize(key, keyOutView);
最后一步keySerializer.serialize(key, keyOutView)一個(gè)Record的key就被寫到keyOutView中,也就是說對(duì)應(yīng)的key是從每個(gè)record中獲取的,所以在backend.db.put方法中就能獲取到對(duì)應(yīng)的Key
其他
對(duì)于keyedStateStore是在哪里初始化的,可以看AbstractStreamOperator中initializeState方法:
final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
這個(gè)方法里也包括了keyedStatedBackend和operatorStateBackend等初始化, 具體的細(xì)節(jié)后續(xù)再解析。