public class StormClusterStateImpl extends Object implements IStormClusterState
Constructor and Description |
---|
StormClusterStateImpl(IStateStorage StateStorage,
List<org.apache.zookeeper.data.ACL> acls,
ClusterStateContext context,
boolean solo) |
Modifier and Type | Method and Description |
---|---|
void |
activateStorm(String stormId,
StormBase stormBase,
Map<String,Object> topoConf) |
List<String> |
activeKeys() |
List<String> |
activeStorms() |
void |
addNimbusHost(String nimbusId,
NimbusSummary nimbusSummary) |
Assignment |
assignmentInfo(String stormId,
Runnable callback) |
VersionedData<Assignment> |
assignmentInfoWithVersion(String stormId,
Runnable callback) |
List<String> |
assignments(Runnable callback) |
Integer |
assignmentVersion(String stormId,
Runnable callback) |
List<String> |
backpressureTopologies() |
List<String> |
blobstore(Runnable callback) |
List<String> |
blobstoreInfo(String blobKey) |
Credentials |
credentials(String stormId,
Runnable callback) |
void |
deleteTopologyProfileRequests(String stormId,
ProfileRequest profileRequest) |
void |
disconnect() |
List<ErrorInfo> |
errors(String stormId,
String componentId) |
List<String> |
errorTopologies() |
Map<ExecutorInfo,ExecutorBeat> |
executorBeats(String stormId,
Map<List<Long>,NodeInfo> executorNodePort)
need to take executor->node+port in explicitly so that we don’t run into a situation where a long dead worker with a skewed clock overrides all the timestamps.
|
List<ProfileRequest> |
getTopologyProfileRequests(String stormId) |
ClusterWorkerHeartbeat |
getWorkerHeartbeat(String stormId,
String node,
Long port) |
List<ProfileRequest> |
getWorkerProfileRequests(String stormId,
NodeInfo nodeInfo) |
List<String> |
heartbeatStorms() |
protected void |
issueCallback(AtomicReference<Runnable> cb) |
protected void |
issueMapCallback(ConcurrentHashMap<String,Runnable> callbackConcurrentHashMap,
String key) |
ErrorInfo |
lastError(String stormId,
String componentId) |
List |
nimbuses() |
void |
removeBackpressure(String stormId) |
void |
removeBlobstoreKey(String blobKey) |
void |
removeKeyVersion(String blobKey) |
void |
removeStorm(String stormId) |
void |
removeStormBase(String stormId) |
void |
removeWorkerBackpressure(String stormId,
String node,
Long port) |
void |
removeWorkerHeartbeat(String stormId,
String node,
Long port) |
void |
reportError(String stormId,
String componentId,
String node,
Long port,
Throwable error) |
void |
setAssignment(String stormId,
Assignment info,
Map<String,Object> topoConf) |
void |
setCredentials(String stormId,
Credentials creds,
Map topoConf) |
void |
setTopologyLogConfig(String stormId,
LogConfig logConfig,
Map<String,Object> topoConf) |
void |
setupBackpressure(String stormId,
Map<String,Object> topoConf) |
void |
setupBlobstore(String key,
NimbusInfo nimbusInfo,
Integer versionInfo) |
void |
setupErrors(String stormId,
Map<String,Object> topoConf) |
void |
setupHeatbeats(String stormId,
Map<String,Object> topoConf) |
void |
setWorkerProfileRequest(String stormId,
ProfileRequest profileRequest) |
StormBase |
stormBase(String stormId,
Runnable callback) |
void |
supervisorHeartbeat(String supervisorId,
SupervisorInfo info) |
SupervisorInfo |
supervisorInfo(String supervisorId) |
List<String> |
supervisors(Runnable callback) |
void |
teardownHeartbeats(String stormId) |
void |
teardownTopologyErrors(String stormId) |
boolean |
topologyBackpressure(String stormId,
Runnable callback)
Check whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
|
LogConfig |
topologyLogConfig(String stormId,
Runnable cb) |
void |
updateStorm(String stormId,
StormBase newElems)
To update this function due to APersistentMap/APersistentSet is clojure’s structure
|
void |
workerBackpressure(String stormId,
String node,
Long port,
boolean on)
if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
|
void |
workerHeartbeat(String stormId,
String node,
Long port,
ClusterWorkerHeartbeat info) |
public StormClusterStateImpl(IStateStorage StateStorage, List<org.apache.zookeeper.data.ACL> acls, ClusterStateContext context, boolean solo) throws Exception
Exception
protected void issueCallback(AtomicReference<Runnable> cb)
protected void issueMapCallback(ConcurrentHashMap<String,Runnable> callbackConcurrentHashMap, String key)
public List<String> assignments(Runnable callback)
assignments
in interface IStormClusterState
public Assignment assignmentInfo(String stormId, Runnable callback)
assignmentInfo
in interface IStormClusterState
public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback)
assignmentInfoWithVersion
in interface IStormClusterState
public Integer assignmentVersion(String stormId, Runnable callback) throws Exception
assignmentVersion
in interface IStormClusterState
Exception
public List<String> blobstoreInfo(String blobKey)
blobstoreInfo
in interface IStormClusterState
public List nimbuses()
nimbuses
in interface IStormClusterState
public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary)
addNimbusHost
in interface IStormClusterState
public List<String> activeStorms()
activeStorms
in interface IStormClusterState
public StormBase stormBase(String stormId, Runnable callback)
stormBase
in interface IStormClusterState
public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port)
getWorkerHeartbeat
in interface IStormClusterState
public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo)
getWorkerProfileRequests
in interface IStormClusterState
public List<ProfileRequest> getTopologyProfileRequests(String stormId)
getTopologyProfileRequests
in interface IStormClusterState
public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest)
setWorkerProfileRequest
in interface IStormClusterState
public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest)
deleteTopologyProfileRequests
in interface IStormClusterState
public Map<ExecutorInfo,ExecutorBeat> executorBeats(String stormId, Map<List<Long>,NodeInfo> executorNodePort)
need to take executor->node+port in explicitly so that we don’t run into a situation where a long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid situations like that
executorBeats
in interface IStormClusterState
stormId
- executorNodePort
- public List<String> supervisors(Runnable callback)
supervisors
in interface IStormClusterState
public SupervisorInfo supervisorInfo(String supervisorId)
supervisorInfo
in interface IStormClusterState
public void setupHeatbeats(String stormId, Map<String,Object> topoConf)
setupHeatbeats
in interface IStormClusterState
public void teardownHeartbeats(String stormId)
teardownHeartbeats
in interface IStormClusterState
public void teardownTopologyErrors(String stormId)
teardownTopologyErrors
in interface IStormClusterState
public List<String> heartbeatStorms()
heartbeatStorms
in interface IStormClusterState
public List<String> errorTopologies()
errorTopologies
in interface IStormClusterState
public List<String> backpressureTopologies()
backpressureTopologies
in interface IStormClusterState
public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String,Object> topoConf)
setTopologyLogConfig
in interface IStormClusterState
public LogConfig topologyLogConfig(String stormId, Runnable cb)
topologyLogConfig
in interface IStormClusterState
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info)
workerHeartbeat
in interface IStormClusterState
public void removeWorkerHeartbeat(String stormId, String node, Long port)
removeWorkerHeartbeat
in interface IStormClusterState
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info)
supervisorHeartbeat
in interface IStormClusterState
public void workerBackpressure(String stormId, String node, Long port, boolean on)
if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
workerBackpressure
in interface IStormClusterState
stormId
- node
- port
- on
- public boolean topologyBackpressure(String stormId, Runnable callback)
Check whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
topologyBackpressure
in interface IStormClusterState
stormId
- callback
- public void setupBackpressure(String stormId, Map<String,Object> topoConf)
setupBackpressure
in interface IStormClusterState
public void removeBackpressure(String stormId)
removeBackpressure
in interface IStormClusterState
public void removeWorkerBackpressure(String stormId, String node, Long port)
removeWorkerBackpressure
in interface IStormClusterState
public void activateStorm(String stormId, StormBase stormBase, Map<String,Object> topoConf)
activateStorm
in interface IStormClusterState
public void updateStorm(String stormId, StormBase newElems)
To update this function due to APersistentMap/APersistentSet is clojure’s structure
updateStorm
in interface IStormClusterState
stormId
- newElems
- public void removeStormBase(String stormId)
removeStormBase
in interface IStormClusterState
public void setAssignment(String stormId, Assignment info, Map<String,Object> topoConf)
setAssignment
in interface IStormClusterState
public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo)
setupBlobstore
in interface IStormClusterState
public List<String> activeKeys()
activeKeys
in interface IStormClusterState
public List<String> blobstore(Runnable callback)
blobstore
in interface IStormClusterState
public void removeStorm(String stormId)
removeStorm
in interface IStormClusterState
public void removeBlobstoreKey(String blobKey)
removeBlobstoreKey
in interface IStormClusterState
public void removeKeyVersion(String blobKey)
removeKeyVersion
in interface IStormClusterState
public void setupErrors(String stormId, Map<String,Object> topoConf)
setupErrors
in interface IStormClusterState
public void reportError(String stormId, String componentId, String node, Long port, Throwable error)
reportError
in interface IStormClusterState
public List<ErrorInfo> errors(String stormId, String componentId)
errors
in interface IStormClusterState
public ErrorInfo lastError(String stormId, String componentId)
lastError
in interface IStormClusterState
public void setCredentials(String stormId, Credentials creds, Map topoConf)
setCredentials
in interface IStormClusterState
public Credentials credentials(String stormId, Runnable callback)
credentials
in interface IStormClusterState
public void disconnect()
disconnect
in interface IStormClusterState
Copyright © 2019 The Apache Software Foundation. All Rights Reserved.