序
本文主要研究一下flink DataStream的split操作
实例
SplitStreamsplit = someDataStream.split(new OutputSelector () { @Override public Iterable select(Integer value) { List output = new ArrayList (); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; }});
- 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd
DataStream.split
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Publicpublic class DataStream{ //...... public SplitStream split(OutputSelector outputSelector) { return new SplitStream<>(this, clean(outputSelector)); } //......}
- DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
OutputSelector
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@PublicEvolvingpublic interface OutputSelectorextends Serializable { Iterable select(OUT value);}
- OutputSelector定义了select方法用于给element打上outputNames
SplitStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java
@PublicEvolvingpublic class SplitStreamextends DataStream { protected SplitStream(DataStream dataStream, OutputSelector outputSelector) { super(dataStream.getExecutionEnvironment(), new SplitTransformation (dataStream.getTransformation(), outputSelector)); } public DataStream select(String... outputNames) { return selectOutput(outputNames); } private DataStream selectOutput(String[] outputNames) { for (String outName : outputNames) { if (outName == null) { throw new RuntimeException("Selected names must not be null"); } } SelectTransformation selectTransform = new SelectTransformation (this.getTransformation(), Lists.newArrayList(outputNames)); return new DataStream (this.getExecutionEnvironment(), selectTransform); }}
- SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation
StreamGraphGenerator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@Internalpublic class StreamGraphGenerator { //...... private Collectiontransform(StreamTransformation transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection transformedIds; if (transform instanceof OneInputTransformation ) { transformedIds = transformOneInputTransform((OneInputTransformation ) transform); } else if (transform instanceof TwoInputTransformation ) { transformedIds = transformTwoInputTransform((TwoInputTransformation ) transform); } else if (transform instanceof SourceTransformation ) { transformedIds = transformSource((SourceTransformation ) transform); } else if (transform instanceof SinkTransformation ) { transformedIds = transformSink((SinkTransformation ) transform); } else if (transform instanceof UnionTransformation ) { transformedIds = transformUnion((UnionTransformation ) transform); } else if (transform instanceof SplitTransformation ) { transformedIds = transformSplit((SplitTransformation ) transform); } else if (transform instanceof SelectTransformation ) { transformedIds = transformSelect((SelectTransformation ) transform); } else if (transform instanceof FeedbackTransformation ) { transformedIds = transformFeedback((FeedbackTransformation ) transform); } else if (transform instanceof CoFeedbackTransformation ) { transformedIds = transformCoFeedback((CoFeedbackTransformation ) transform); } else if (transform instanceof PartitionTransformation ) { transformedIds = transformPartition((PartitionTransformation ) transform); } else if (transform instanceof SideOutputTransformation ) { transformedIds = transformSideOutput((SideOutputTransformation ) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; } private Collection transformSelect(SelectTransformation select) { StreamTransformation input = select.getInput(); Collection resultIds = transform(input); // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); } List virtualResultIds = new ArrayList<>(); for (int inputId : resultIds) { int virtualId = StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames()); virtualResultIds.add(virtualId); } return virtualResultIds; } private Collection transformSplit(SplitTransformation split) { StreamTransformation input = split.getInput(); Collection resultIds = transform(input); // the recursive transform call might have transformed this already if (alreadyTransformed.containsKey(split)) { return alreadyTransformed.get(split); } for (int inputId : resultIds) { streamGraph.addOutputSelector(inputId, split.getOutputSelector()); } return resultIds; } //......}
- StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
- transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
- transformSplit方法则根据split.getOutputSelector()来addOutputSelector
小结
- DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
- OutputSelector定义了select方法用于给element打上outputNames
- SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream