博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink DataStream的split操作
阅读量:6154 次
发布时间:2019-06-21

本文共 7244 字,大约阅读时间需要 24 分钟。

  hot3.png

本文主要研究一下flink DataStream的split操作

实例

SplitStream
split = someDataStream.split(new OutputSelector
() { @Override public Iterable
select(Integer value) { List
output = new ArrayList
(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; }});
  • 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd

DataStream.split

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Publicpublic class DataStream
{ //...... public SplitStream
split(OutputSelector
outputSelector) { return new SplitStream<>(this, clean(outputSelector)); } //......}
  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

OutputSelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

@PublicEvolvingpublic interface OutputSelector
extends Serializable { Iterable
select(OUT value);}
  • OutputSelector定义了select方法用于给element打上outputNames

SplitStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java

@PublicEvolvingpublic class SplitStream
extends DataStream
{ protected SplitStream(DataStream
dataStream, OutputSelector
outputSelector) { super(dataStream.getExecutionEnvironment(), new SplitTransformation
(dataStream.getTransformation(), outputSelector)); } public DataStream
select(String... outputNames) { return selectOutput(outputNames); } private DataStream
selectOutput(String[] outputNames) { for (String outName : outputNames) { if (outName == null) { throw new RuntimeException("Selected names must not be null"); } } SelectTransformation
selectTransform = new SelectTransformation
(this.getTransformation(), Lists.newArrayList(outputNames)); return new DataStream
(this.getExecutionEnvironment(), selectTransform); }}
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation

StreamGraphGenerator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

@Internalpublic class StreamGraphGenerator {	//......	private Collection
transform(StreamTransformation
transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection
transformedIds; if (transform instanceof OneInputTransformation
) { transformedIds = transformOneInputTransform((OneInputTransformation
) transform); } else if (transform instanceof TwoInputTransformation
) { transformedIds = transformTwoInputTransform((TwoInputTransformation
) transform); } else if (transform instanceof SourceTransformation
) { transformedIds = transformSource((SourceTransformation
) transform); } else if (transform instanceof SinkTransformation
) { transformedIds = transformSink((SinkTransformation
) transform); } else if (transform instanceof UnionTransformation
) { transformedIds = transformUnion((UnionTransformation
) transform); } else if (transform instanceof SplitTransformation
) { transformedIds = transformSplit((SplitTransformation
) transform); } else if (transform instanceof SelectTransformation
) { transformedIds = transformSelect((SelectTransformation
) transform); } else if (transform instanceof FeedbackTransformation
) { transformedIds = transformFeedback((FeedbackTransformation
) transform); } else if (transform instanceof CoFeedbackTransformation
) { transformedIds = transformCoFeedback((CoFeedbackTransformation
) transform); } else if (transform instanceof PartitionTransformation
) { transformedIds = transformPartition((PartitionTransformation
) transform); } else if (transform instanceof SideOutputTransformation
) { transformedIds = transformSideOutput((SideOutputTransformation
) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; } private
Collection
transformSelect(SelectTransformation
select) { StreamTransformation
input = select.getInput(); Collection
resultIds = transform(input); // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); } List
virtualResultIds = new ArrayList<>(); for (int inputId : resultIds) { int virtualId = StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames()); virtualResultIds.add(virtualId); } return virtualResultIds; } private
Collection
transformSplit(SplitTransformation
split) { StreamTransformation
input = split.getInput(); Collection
resultIds = transform(input); // the recursive transform call might have transformed this already if (alreadyTransformed.containsKey(split)) { return alreadyTransformed.get(split); } for (int inputId : resultIds) { streamGraph.addOutputSelector(inputId, split.getOutputSelector()); } return resultIds; } //......}
  • StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
  • transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
  • transformSplit方法则根据split.getOutputSelector()来addOutputSelector

小结

  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
  • OutputSelector定义了select方法用于给element打上outputNames
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream

doc

转载于:https://my.oschina.net/go4it/blog/3001050

你可能感兴趣的文章
集中管理系统--puppet
查看>>
分布式事务最终一致性常用方案
查看>>
Exchange 2013 PowerShell配置文件
查看>>
JavaAPI详解系列(1):String类(1)
查看>>
HTML条件注释判断IE<!--[if IE]><!--[if lt IE 9]>
查看>>
发布和逸出-构造过程中使this引用逸出
查看>>
使用SanLock建立简单的HA服务
查看>>
Subversion使用Redmine帐户验证简单应用、高级应用以及优化
查看>>
Javascript Ajax 异步请求
查看>>
DBCP连接池
查看>>
cannot run programing "db2"
查看>>
mysql做主从relay-log问题
查看>>
Docker镜像与容器命令
查看>>
批量删除oracle中以相同类型字母开头的表
查看>>
Java基础学习总结(4)——对象转型
查看>>
BZOJ3239Discrete Logging——BSGS
查看>>
SpringMVC权限管理
查看>>
spring 整合 redis 配置
查看>>
cacti分组发飞信模块开发
查看>>
浅析LUA中游戏脚本语言之魔兽世界
查看>>