HEXH's Blog

面朝大海,春暖花开


  • 首页

  • 分类

  • 标签

  • 归档

  • 公益404

Chapter5:Basic Types and Operations

发表于 2015-12-06   |   分类于 Scala   |  

Literals

  • String literals

Scala includes a special syntaxfor raw strings. You start and end a raw string with three double quotationmarks in a row ("""). The interior of a raw string may contain any characterswhatsoever, including newlines, quotation marks, and special characters, except of course three quotes in a row.

example
println("""Welcome to Ultamix 3000.
Type "HELP" for help.""")
//Welcome to Ultamix 3000.
// Type "HELP" for help.
put a pipe character (|) at the front of each line,then call stripMargin on strings,to escape the leading spaces
println("""|Welcome to Ultamix 3000.
|Type "HELP" for help.""".stripMargin)
//Welcome to Ultamix 3000.
//Type "HELP" for help.""".stripMargin
  • Symbol literals

A symbol literal is written 'ident, where ident can be any alphanumericidentifier. Such literals are mapped to instances of the predefined classscala.Symbol. Specifically, the literal 'cymbal will be expanded by thecompiler to a factory method invocation: Symbol("cymbal").

NOTE:
Another thing that’s noteworthy is that symbols are interned(限定的). If you write the same symbol literal twice, both expressions will refer to the exact same Symbol object.

Operators and methods

Scala provides a rich set of operators for its basic types. As mentioned inprevious chapters, these operators are actually just a nice syntax for ordinary method calls.

Any method can be an operator
In Scala operators are not special language syntax: any method can be an operator. What makes a method an operator is how you use it. When you write “s.indexOf('o')”, indexOf is not an operator. But when you write “s indexOf 'o'”, indexOf is an operator, because you’re using it in operator notation.

  • infix operator
//single argument
s indexOf 'o'
//multiple arguments
s indexOf ('o', 5)
  • prefix operator

Scala will transform the expression -2.0 into the method invocation “(2.0).unary_-”

NOTE:The only identifiers that can be used as prefix operators are+, -, !, and ~.Thus, if you define a method named unary_!, you could invoke that methodon a value or variable of the appropriate type using prefix operator notation,such as !p. But if you define a method named unary_*, you wouldn’t be ableto use prefix operator notation, because * isn’t one of the four identifiers thatcan be used as prefix operators.

  • postfix operator

Postfix operators are methods that take no arguments, when they are invoked without a dot or parentheses.

Relational and logical operations

Note: You may be wondering how short-circuiting can work given operators are just methods. Normally, all arguments are evaluated before entering a method, so how can a method avoid evaluating its second argument? The answer is that all Scala methods have a facility for delaying the evaluation of their arguments, or even declining to evaluate them at all. The facility is called by-name parameters and is discussed in Section 9.5.

Object equality

If you want to compare two objects for equality, you can use either ==, or its inverse !=.

**How Scala’s == differs from Java’s **
In Java,you can use == to compare both primitive and reference types. On primitive types, Java’s == compares value equality, as in Scala. On reference types, however, Java’s == compares reference equality, which means the two variables point to the same object on the JVM’s heap. Scala provides a facility for comparing reference equality, as well, under the name eq. However, eq and its opposite, ne, only apply to objects that directly map to Java objects. The full details about eq and ne are given in Sections 11.1 and 11.2. Also, see Chapter 30 on how to write a good equals method.

Operator precedence and associativity

Operator precedence
(all other special characters)
* / %
+ -
:
= !
< >
& ˆ |
(all letters)
(all assignment operators)
  • Scala decides precedence based on the first character of the methods used in operator notation
a +++ b *** c //a +++ (b *** c)
  • The one exception to the precedence rule

The one exception to the precedence rule, alluded to above, concerns assignment operators, which end in an equals character. If an operator ends in an equals character (=), and the operator is not one of the comparison operators <=, >=, ==, or !=, then the precedence of the operator is the same as that of simple assignment (=).

because *= is classified as an assignment operator whose precedence is lower than +, even though the operator’s first character is *, which would suggest a precedence higher than +.
x *= y + 1 //x *= (y + 1)
  • associativity

No matter what associativity an operator has, however, its operands are always evaluated left to right
If the methods end in ‘:’, they are grouped right to left; otherwise, they are grouped left to right. For example,a ::: b ::: c is treated as a ::: (b ::: c). But a * b * c, by contrast, is treated as (a * b) * c.

Rich wrappers

Some rich operations

Code Result
0 max 5 5
0 min 5 0
-2.7 abs 2.7
-2.7 round -3L
1.5 isInfinity false
(1.0 / 0) isInfinity true
4 to 6 Range(4, 5, 6)
"bob" capitalize "Bob"
"robert" drop 2 "bert"

Rich wrapper classes

Basic type Rich wrapper
Byte scala.runtime.RichByte
Short scala.runtime.RichShort
Int scala.runtime.RichInt
Char scala.runtime.RichChar
Float scala.runtime.RichFloat
Double scala.runtime.RichDouble
Boolean scala.runtime.RichBoolean
String scala.collection.immutable.StringOps

MasterBatchCoordinator源码

发表于 2015-12-02   |   分类于 jstorm   |  

CoordinatorState Zk

${spout_id}
-coordinator
-currtx 当前txid
-currattempts txid => attempt
-meta
-txid1 存储的metaData
-txid2
-user

AttemptStatus
private static enum AttemptStatus {
PROCESSING, PROCESSED, COMMITTING
}
//multi spout state relate to zk
private List<TransactionalState> _states = new ArrayList();
//active Tx txid => {status,attempt}
//processing--[first ack]-->processed--[second ack]-->committing--[third ack]-->{removed,_currTransaction++}
//any status --[fail ack]-->{remove all larger or equals tx}
TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
TreeMap<Long, Integer> _attemptIds;

private SpoutOutputCollector _collector;
Long _currTransaction;
int _maxTransactionActive;

List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();

List<String> _managedSpoutIds;
List<ITridentSpout> _spouts;
WindowedTimeThrottler _throttler;

TridentTopology源码

发表于 2015-11-30   |   分类于 jstorm   |  

参考:

  • http://www.cnblogs.com/hseagle/p/3756862.html
  • http://www.flyne.org/article/216

从TridentTopology到基本的Topology有三层,下图给出一个全局的视图。下图引自徽沪一郎的blog

TridentTopology属性

//一个simple directed graph
DefaultDirectedGraph<Node, IndexedEdge> _graph;
//state.node.id=>[self,stateQueryNode]
Map<String, List<Node>> _colocate = new HashMap();
//生成唯一的stream,state的ID
UniqueIdGen _gen;

Node Type

  • Node
  • SpoutNode
  • ProcessorNode
  • PartitionNode
public static enum SpoutType {
DRPC, BATCH
}

TridentTopology#newStream create a BATCH SpoutNode with ITridentSpout,若不是ITridentSpout最终也会被包装成ITridentSpout

  • IRichSpout : RichSpoutBatchExecutor
  • IBatchSpout : BatchSpoutExecutor
  • IPartitionedTridentSpout : PartitionedTridentSpoutExecutor
  • IOpaquePartitionedTridentSpout : OpaquePartitionedTridentSpoutExecutor

TridentTopology#newDRPCStream create a DRPC SpoutNode with DRPCSpout,没有txid

TridentTopology#newStaticState create a Node with NodeStateInfo
奇怪,一直以为只会创建Node的子类?

Node name,Stream name,StreamId

TridentTopology 直接创建的Node name都为空,相应的Stream的name也为空。
Stream的name可以通过Stream#name(String name)来修改,但与之相依的Node的name不能相应改变。
当在Stream通过operation创建新的Node时,Node的name为创建它的Stream的name。

Stream operation create new Node,generally,create a new stream id,except operation partition,new node using it's parent streamId.

build

TridentTopology#build
//其中mergedGroups为spoutGroup,boltGroup
//spoutNodes 为SpoutNode集合

//已经省略了,完成drpc的环,以及node合并成group等。

//node => batchGroupName,ex:bg0,bg1
//可能有多个batchGroup,亲测可以{没有连接的多个拓扑图}
Map<Node, String> batchGroupMap = new HashMap();
List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
for (int i = 0; i < connectedComponents.size(); i++) {
String groupId = "bg" + i;
for (Node n : connectedComponents.get(i)) {
batchGroupMap.put(n, groupId);
}
}

//计算每个Group的parallelism
Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);

TridentTopologyBuilder builder = new TridentTopologyBuilder();

//spoutNode => spoutId ,ex:spout0,spout1 String为componentId
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
//opretionNode => boltId,ex:b-0,b-1,b-2-kkk String为componentId
Map<Group, String> boltIds = genBoltIds(mergedGroups);

for (SpoutNode sn : spoutNodes) {
Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
if (sn.type == SpoutNode.SpoutType.DRPC) {
//spout0,s0,IRichSpout,parallelism,bg0
builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId, (IRichSpout) sn.spout, parallelism,
batchGroupMap.get(sn));
} else {
ITridentSpout s;
if (sn.spout instanceof IBatchSpout) {
s = new BatchSpoutExecutor((IBatchSpout) sn.spout);
} else if (sn.spout instanceof ITridentSpout) {
s = (ITridentSpout) sn.spout;
} else {
throw new RuntimeException(
"Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need
// lots of updates to support this throughout)
}
//spout0,s0,txStateId,ITridentSpout,parallelism,bg0
builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
}
}

for (Group g : mergedGroups) {
if (!isSpoutGroup(g)) {
Integer p = parallelisms.get(g);
//stream to batchGroup {s0=>bg0}
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
//b-0,SubtopologyBolt(graph, g.nodes, batchGroupMap),parallelism,{bg0},{s0=>bg0}
BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
committerBatches(g, batchGroupMap), streamToGroup);
Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for (PartitionNode n : inputs) {
Node parent = TridentUtils.getParent(graph, n);
String componentId;
if (parent instanceof SpoutNode) {
componentId = spoutIds.get(parent);
} else {
componentId = boltIds.get(grouper.nodeGroup(parent));
}
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
}
}
}

return builder.buildTopology();
TridentTopologyBuilder#setBatchPerTupleSpout
public SpoutDeclarer setBatchPerTupleSpout(String id, String streamName, IRichSpout spout, Integer parallelism,
String batchGroup)
{

Map<String, String> batchGroups = new HashMap();
batchGroups.put(streamName, batchGroup);
//_batchIds.put(new GlobalStreamId(id, streamName), batchGroup);GlobalStreamId=>batchGroup
markBatchGroups(id, batchGroups);
SpoutComponent c = new SpoutComponent(spout, streamName, parallelism, batchGroup);
//componentId => SpoutComponent
_batchPerTupleSpouts.put(id, c);
return new SpoutDeclarerImpl(c);
}
TridentTopologyBuilder#setSpout
public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout,
Integer parallelism, String batchGroup)
{

Map<String, String> batchGroups = new HashMap();
batchGroups.put(streamName, batchGroup);
markBatchGroups(id, batchGroups);

TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId,
batchGroup);
_spouts.put(id, c);
return new SpoutDeclarerImpl(c);
}

TopologyBuilder源码

发表于 2015-11-30   |   分类于 jstorm   |  

TopologyBuilder功能:

编织ISpout,IBolt的拓扑图,并生成StormTopology实例,以便thrift使用.

用户编程的接口:IBolt,ISpout,IStateSpout

IRichBolt,IBasicBolt,IRichSpout,IRichStateSpout

  • IBasicBolt没有继承IBolt,在编织拓扑结构时,使用BasicBoltExecutor包装成IRichBolt
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
  • IRichStateSpout暂时没有实现
public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) {
validateUnusedId(id);
// TODO: finish
}
阅读全文 »

NimbusData源码

发表于 2015-11-25   |   分类于 jstorm   |  

属性

  • downloaders,uploaders
// TODO two kind of value:Channel/BufferFileInputStream
private TimeCacheMap<Object, Object> downloaders;
private TimeCacheMap<Object, Object> uploaders;
/**
* During upload/download with the master, how long an upload or download connection is idle before nimbus considers it dead and drops the connection.
*/

public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs";
  • submittedCount
  • stormClusterState ==> com.alibaba.jstorm.cluster.StormClusterState
  • statusTransition

StormClusterState

public void disconnect() throws Exception;
public void remove_storm(String topology_id) throws Exception;
public void try_remove_storm(String topology_id);
public List<String> assignments(RunnableCallback callback) throws Exception;
public Assignment assignment_info(String topology_id, RunnableCallback callback) throws Exception;
public void set_assignment(String topology_id, Assignment info) throws Exception;
public AssignmentBak assignment_bak(String topologyName) throws Exception;
public void backup_assignment(String topology_id, AssignmentBak info) throws Exception;
public List<String> active_storms() throws Exception;
public StormBase storm_base(String topology_id, RunnableCallback callback) throws Exception;
public void activate_storm(String topology_id, StormBase storm_base) throws Exception;
public void update_storm(String topology_id, StormStatus new_elems) throws Exception;
public void set_storm_monitor(String topologyId, boolean isEnable) throws Exception;
public void remove_storm_base(String topology_id) throws Exception;
public List<String> task_storms() throws Exception;
public Set<Integer> task_ids(String topology_id) throws Exception;
public Set<Integer> task_ids_by_componentId(String topologyId, String componentId) throws Exception;
public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception;
public void add_task(String topology_id, Map<Integer, TaskInfo> taskInfoMap) throws Exception;
public void remove_task(String topologyId, Set<Integer> taskIds) throws Exception;
public Map<Integer, TaskInfo> task_all_info(String topology_id) throws Exception;
public List<String> heartbeat_storms() throws Exception;
public void topology_heartbeat(String topology_id, TopologyTaskHbInfo info) throws Exception;
public TopologyTaskHbInfo topology_heartbeat(String topologyId) throws Exception;
public void teardown_heartbeats(String topology_id) throws Exception;
public List<String> task_error_storms() throws Exception;
public List<String> task_error_ids(String topologyId) throws Exception;
public void report_task_error(String topology_id, int task_id, Throwable error) throws Exception;
public void report_task_error(String topology_id, int task_id, String error, String tag) throws Exception;
public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception;
public void remove_lastErr_time(String topologyId) throws Exception;
public List<TaskError> task_errors(String topology_id, int task_id) throws Exception;
public void remove_task_error(String topologyId, int taskId) throws Exception;
public List<String> task_error_time(String topologyId, int taskId) throws Exception;
public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception;
public void teardown_task_errors(String topology_id) throws Exception;
public List<String> supervisors(RunnableCallback callback) throws Exception;
public SupervisorInfo supervisor_info(String supervisor_id) throws Exception;
public void supervisor_heartbeat(String supervisor_id, SupervisorInfo info) throws Exception;
public boolean try_to_be_leader(String path, String host, RunnableCallback callback) throws Exception;
public String get_leader_host() throws Exception;
public boolean leader_existed() throws Exception;
public List<String> get_nimbus_slaves() throws Exception;
public void update_nimbus_slave(String host, int time) throws Exception;
public String get_nimbus_slave_time(String host) throws Exception;
public void unregister_nimbus_host(String host) throws Exception;
public void update_nimbus_detail(String hostPort, Map map) throws Exception;
public Map get_nimbus_detail(String hostPort, boolean watch) throws Exception;
public void unregister_nimbus_detail(String hostPort) throws Exception;
public void set_topology_metric(String topologyId, Object metric) throws Exception;
public Object get_topology_metric(String topologyId) throws Exception;
public List<String> get_metrics() throws Exception;
public List<String> list_dirs(String path, boolean watch) throws Exception;
public List<String> backpressureInfos() throws Exception;
public void set_backpressure_info(String topologyId, Map<String, SourceBackpressureInfo> sourceToBackpressureInfo) throws Exception;
public Map<String, SourceBackpressureInfo> get_backpressure_info(String topologyId) throws Exception;
public void teardown_backpressure(String topologyId) throws Exception;

JStormCache源码

发表于 2015-11-25   |   分类于 jstorm   |  

Interface

void init(Map<Object, Object> conf) throws Exception;
void cleanup();
Object get(String key);
void getBatch(Map<String, Object> map);
void remove(String key);
void removeBatch(Collection<String> keys);
void put(String key, Object value, int timeoutSecond);
void put(String key, Object value);
void putBatch(Map<String, Object> map);
void putBatch(Map<String, Object> map, int timeoutSeconds);

JStormCache的实现

  • com.alibaba.jstorm.cache.TimeoutMemCache
  • com.alibaba.jstorm.cache.RocksDBCache
  • com.alibaba.jstorm.cache.RocksTTLDBCache
阅读全文 »

links

发表于 2015-11-25   |   分类于 trivial   |  

2015-11-25

  • http://stackoverflow.com/questions/7676022/html5-flexible-box-model-height-calculation/15388247#15388247
  • https://commons.apache.org/proper/commons-exec/
  • http://mathworld.wolfram.com/DirectedGraph.html
  • http://zh.wordow.com/english/dictionary/?t=bidirected
  • https://zh.wikipedia.org/wiki/%E5%AE%8C%E5%85%A8%E5%9C%96
  • http://mathworld.wolfram.com/Indegree.html
  • https://backlogtool.com/git-guide/tw/stepup/stepup7_4.html
  • http://www.cnblogs.com/fxjwind/p/3182351.html
  • http://ifeve.com/disruptor/
  • http://www.yworks.com/
  • http://www.cnblogs.com/xishuai/p/iddd-cqrs-and-eda.html
  • http://www.umlchina.com/book/softmeth.htm
  • http://www.cnblogs.com/hseagle/p/3756862.html
  • http://www.flyne.org/article/216
  • http://www.cnblogs.com/hseagle/p/3756862.html
  • http://www.sqlparty.com/mapreduce%E5%BA%94%E7%94%A8%E4%B8%ADcombinefileinputformat%E5%8E%9F%E7%90%86%E4%B8%8E%E7%94%A8%E6%B3%95/
  • http://segmentfault.com/a/1190000002680804
  • http://blog.csdn.net/dizzthxl/article/details/8498833
  • http://www.cnblogs.com/basecn/p/4288456.html
  • http://blog.jpush.cn/redis-twemproxy-benchmark/
  • http://technology.inmobi.com/blog/real-time-stream-processing-at-inmobi-part-1
  • http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
  • https://github.com/adam-p/markdown-here/wiki/Markdown-Cheatsheet
  • http://www.unjeep.com/hexomarkdownbiaoge/

JStorm Local Dir

发表于 2015-11-25   |   分类于 jstorm   |  

storm.local.dir

/**
* A directory on the local filesystem used by Storm for any local filesystem usage it needs. The directory must exist and the Storm daemons must have
* permission to read/write from this location.
*/

public static final String STORM_LOCAL_DIR = "storm.local.dir";
  • nimbus [nimbus local dir]
    • pids
      • {$nimbus_pid}
    • rocksdb NimbusCache的data目录
  • metrics
    • rocksdb JStormMetricCache的data目录

Get the JVM Pid

发表于 2015-11-25   |   分类于 java   |  
/**
* Gets the pid of this JVM, because Java doesn't provide a real way to do this.
*
* @return
*/

public static String process_pid() {
String name = ManagementFactory.getRuntimeMXBean().getName();
String[] split = name.split("@");
if (split.length != 2) {
throw new RuntimeException("Got unexpected process name: " + name);
}

return split[0];
}

English-Words-20151123

发表于 2015-11-23   |   分类于 english   |  
  • notation /nəʊ'teɪʃ(ə)n/ n. 符号;乐谱;注释;记号法
  • infix /ɪn'fɪks/ vt. 把…印入;把…插进 n. [计] 中缀;插入词
  • graduation /grædʒʊ'eɪʃ(ə)n/ n. 毕业;毕业典礼;刻度,分度;分等级
  • congratulation /kəngrætjʊ'leɪʃ(ə)n/ n. 祝贺;贺辞
  • soccer /'sɒkə/ n. 英式足球,足球
  • score /skɔː/ 分数;二十;配乐;刻痕
  • translate /træns'leɪt/ vt. 翻译;转化;解释;转变为;调动
  • transform /træns'fɔːm/ vt. 改变,使…变形;转换
阅读全文 »
123…9
Xuehui He

Xuehui He

面朝大海,春暖花开

83 日志
23 分类
13 标签
RSS
github
© 2013 - 2016 Xuehui He
由 Hexo 强力驱动
主题 - NexT.Mist