Package org.apache.storm.cluster
Class StormClusterStateImpl
- java.lang.Object
-
- org.apache.storm.cluster.StormClusterStateImpl
-
- All Implemented Interfaces:
IStormClusterState
public class StormClusterStateImpl extends Object implements IStormClusterState
-
-
Constructor Summary
Constructors Constructor Description StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend, ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
activateStorm(String stormId, StormBase stormBase, Map<String,Object> topoConf)
List<String>
activeKeys()
List<String>
activeStorms()
void
addNimbusHost(String nimbusId, NimbusSummary nimbusSummary)
void
addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key)
Store a new version of a private key.Assignment
assignmentInfo(String stormId, Runnable callback)
Get the assignment based on storm id from local backend.VersionedData<Assignment>
assignmentInfoWithVersion(String stormId, Runnable callback)
List<String>
assignments(Runnable callback)
Map<String,Assignment>
assignmentsInfo()
Get all the topologies assignments mapping stormId -> Assignment from local backend.Integer
assignmentVersion(String stormId, Runnable callback)
List<String>
backpressureTopologies()
Get backpressure topologies.List<String>
blobstore(Runnable callback)
List<String>
blobstoreInfo(String blobKey)
Credentials
credentials(String stormId, Runnable callback)
void
deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest)
void
disconnect()
List<ErrorInfo>
errors(String stormId, String componentId)
List<String>
errorTopologies()
Map<ExecutorInfo,ExecutorBeat>
executorBeats(String stormId, Map<List<Long>,NodeInfo> executorNodePort)
need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the timestamps.NimbusInfo
getLeader(Runnable callback)
Get leader info from state store, which was written when a master gains leadership.long
getNextPrivateWorkerKeyVersion(WorkerTokenServiceType type, String topologyId)
Get the next key version number that should be used for this topology id.PrivateWorkerKey
getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion)
Get a private key used to validate a token is correct.List<ProfileRequest>
getTopologyProfileRequests(String stormId)
ClusterWorkerHeartbeat
getWorkerHeartbeat(String stormId, String node, Long port)
List<ProfileRequest>
getWorkerProfileRequests(String stormId, NodeInfo nodeInfo)
List<String>
heartbeatStorms()
Set<String>
idsOfTopologiesWithPrivateWorkerKeys()
Get a list of all topologyIds that currently have private worker keys stored, of any kind.boolean
isAssignmentsBackendSynchronized()
Flag to indicate if the assignments synced successfully, seeIStormClusterState.syncRemoteAssignments(Map)
.boolean
isPacemakerStateStore()
Flag to indicate if the Pacameker is backend store.protected void
issueCallback(AtomicReference<Runnable> cb)
protected void
issueMapCallback(ConcurrentHashMap<String,Runnable> callbackConcurrentHashMap, String key)
ErrorInfo
lastError(String stormId, String componentId)
List<NimbusSummary>
nimbuses()
Assignment
remoteAssignmentInfo(String stormId, Runnable callback)
Get the assignment based on storm id from remote state store, eg: ZK.void
removeAllPrivateWorkerKeys(String topologyId)
Remove all of the worker keys for a given topology.void
removeBackpressure(String stormId)
Remove backpressure.void
removeBlobstoreKey(String blobKey)
void
removeExpiredPrivateWorkerKeys(String topologyId)
Remove all keys for the given topology that have expired.void
removeKeyVersion(String blobKey)
void
removeStorm(String stormId)
void
removeStormBase(String stormId)
void
removeWorkerBackpressure(String stormId, String node, Long port)
Remove worker backpressure.void
removeWorkerHeartbeat(String stormId, String node, Long port)
void
reportError(String stormId, String componentId, String node, Long port, Throwable error)
void
setAssignment(String stormId, Assignment info, Map<String,Object> topoConf)
void
setAssignmentsBackendSynchronized()
Mark the assignments as synced successfully, seeIStormClusterState.isAssignmentsBackendSynchronized()
.void
setCredentials(String stormId, Credentials creds, Map<String,Object> topoConf)
void
setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String,Object> topoConf)
void
setupBackpressure(String stormId, Map<String,Object> topoConf)
Setup backpressure.void
setupBlob(String key, NimbusInfo nimbusInfo, Integer versionInfo)
void
setupErrors(String stormId, Map<String,Object> topoConf)
void
setupHeatbeats(String stormId, Map<String,Object> topoConf)
void
setWorkerProfileRequest(String stormId, ProfileRequest profileRequest)
StormBase
stormBase(String stormId, Runnable callback)
Get a storm base for a topology.String
stormId(String stormName)
Get storm id from passed name, null if the name doesn't exist on cluster.void
supervisorHeartbeat(String supervisorId, SupervisorInfo info)
SupervisorInfo
supervisorInfo(String supervisorId)
List<String>
supervisors(Runnable callback)
void
syncRemoteAssignments(Map<String,byte[]> remote)
Sync the remote state store assignments to local backend, used when master gains leadership, seeorg.apache.storm.nimbus.LeaderListenerCallback
.void
syncRemoteIds(Map<String,String> remote)
Sync all the active storm ids of the cluster, used now when master gains leadership.void
teardownHeartbeats(String stormId)
void
teardownTopologyErrors(String stormId)
boolean
topologyBackpressure(String stormId, long timeoutMs, Runnable callback)
Check whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.LogConfig
topologyLogConfig(String stormId, Runnable cb)
void
updateStorm(String stormId, StormBase newElems)
To update this function due to APersistentMap/APersistentSet is clojure's structure.void
workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.storm.cluster.IStormClusterState
allSupervisorInfo, allSupervisorInfo, getTopoId, topologyBases
-
-
-
-
Constructor Detail
-
StormClusterStateImpl
public StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend, ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception
- Throws:
Exception
-
-
Method Detail
-
issueCallback
protected void issueCallback(AtomicReference<Runnable> cb)
-
issueMapCallback
protected void issueMapCallback(ConcurrentHashMap<String,Runnable> callbackConcurrentHashMap, String key)
-
assignments
public List<String> assignments(Runnable callback)
- Specified by:
assignments
in interfaceIStormClusterState
-
assignmentInfo
public Assignment assignmentInfo(String stormId, Runnable callback)
Description copied from interface:IStormClusterState
Get the assignment based on storm id from local backend.- Specified by:
assignmentInfo
in interfaceIStormClusterState
- Parameters:
stormId
- topology idcallback
- callback function- Returns:
Assignment
-
remoteAssignmentInfo
public Assignment remoteAssignmentInfo(String stormId, Runnable callback)
Description copied from interface:IStormClusterState
Get the assignment based on storm id from remote state store, eg: ZK.- Specified by:
remoteAssignmentInfo
in interfaceIStormClusterState
- Parameters:
stormId
- topology idcallback
- callback function- Returns:
Assignment
-
assignmentsInfo
public Map<String,Assignment> assignmentsInfo()
Description copied from interface:IStormClusterState
Get all the topologies assignments mapping stormId -> Assignment from local backend.- Specified by:
assignmentsInfo
in interfaceIStormClusterState
- Returns:
- stormId -> Assignment mapping
-
syncRemoteAssignments
public void syncRemoteAssignments(Map<String,byte[]> remote)
Description copied from interface:IStormClusterState
Sync the remote state store assignments to local backend, used when master gains leadership, seeorg.apache.storm.nimbus.LeaderListenerCallback
.- Specified by:
syncRemoteAssignments
in interfaceIStormClusterState
- Parameters:
remote
- assigned assignments for a specificIStormClusterState
instance, usually a supervisor/node.
-
isAssignmentsBackendSynchronized
public boolean isAssignmentsBackendSynchronized()
Description copied from interface:IStormClusterState
Flag to indicate if the assignments synced successfully, seeIStormClusterState.syncRemoteAssignments(Map)
.- Specified by:
isAssignmentsBackendSynchronized
in interfaceIStormClusterState
- Returns:
- true if is synced successfully
-
isPacemakerStateStore
public boolean isPacemakerStateStore()
Description copied from interface:IStormClusterState
Flag to indicate if the Pacameker is backend store.- Specified by:
isPacemakerStateStore
in interfaceIStormClusterState
- Returns:
- true if Pacemaker is being used as StateStore
-
setAssignmentsBackendSynchronized
public void setAssignmentsBackendSynchronized()
Description copied from interface:IStormClusterState
Mark the assignments as synced successfully, seeIStormClusterState.isAssignmentsBackendSynchronized()
.- Specified by:
setAssignmentsBackendSynchronized
in interfaceIStormClusterState
-
assignmentInfoWithVersion
public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback)
- Specified by:
assignmentInfoWithVersion
in interfaceIStormClusterState
-
assignmentVersion
public Integer assignmentVersion(String stormId, Runnable callback) throws Exception
- Specified by:
assignmentVersion
in interfaceIStormClusterState
- Throws:
Exception
-
blobstoreInfo
public List<String> blobstoreInfo(String blobKey)
- Specified by:
blobstoreInfo
in interfaceIStormClusterState
-
nimbuses
public List<NimbusSummary> nimbuses()
- Specified by:
nimbuses
in interfaceIStormClusterState
-
addNimbusHost
public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary)
- Specified by:
addNimbusHost
in interfaceIStormClusterState
-
activeStorms
public List<String> activeStorms()
- Specified by:
activeStorms
in interfaceIStormClusterState
-
stormBase
public StormBase stormBase(String stormId, Runnable callback)
Description copied from interface:IStormClusterState
Get a storm base for a topology.- Specified by:
stormBase
in interfaceIStormClusterState
- Parameters:
stormId
- the id of the topologycallback
- something to call if the data changes (best effort)- Returns:
- the StormBase or null if it is not alive.
-
stormId
public String stormId(String stormName)
Description copied from interface:IStormClusterState
Get storm id from passed name, null if the name doesn't exist on cluster.- Specified by:
stormId
in interfaceIStormClusterState
- Parameters:
stormName
- storm name- Returns:
- storm id
-
syncRemoteIds
public void syncRemoteIds(Map<String,String> remote)
Description copied from interface:IStormClusterState
Sync all the active storm ids of the cluster, used now when master gains leadership.- Specified by:
syncRemoteIds
in interfaceIStormClusterState
- Parameters:
remote
- stormName -> stormId mapping
-
getWorkerHeartbeat
public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port)
- Specified by:
getWorkerHeartbeat
in interfaceIStormClusterState
-
getWorkerProfileRequests
public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo)
- Specified by:
getWorkerProfileRequests
in interfaceIStormClusterState
-
getTopologyProfileRequests
public List<ProfileRequest> getTopologyProfileRequests(String stormId)
- Specified by:
getTopologyProfileRequests
in interfaceIStormClusterState
-
setWorkerProfileRequest
public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest)
- Specified by:
setWorkerProfileRequest
in interfaceIStormClusterState
-
deleteTopologyProfileRequests
public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest)
- Specified by:
deleteTopologyProfileRequests
in interfaceIStormClusterState
-
executorBeats
public Map<ExecutorInfo,ExecutorBeat> executorBeats(String stormId, Map<List<Long>,NodeInfo> executorNodePort)
need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid situations like that.- Specified by:
executorBeats
in interfaceIStormClusterState
- Parameters:
stormId
- topology idexecutorNodePort
- executor id -> node + port- Returns:
- mapping of executorInfo -> executor beat
-
supervisors
public List<String> supervisors(Runnable callback)
- Specified by:
supervisors
in interfaceIStormClusterState
-
supervisorInfo
public SupervisorInfo supervisorInfo(String supervisorId)
- Specified by:
supervisorInfo
in interfaceIStormClusterState
-
setupHeatbeats
public void setupHeatbeats(String stormId, Map<String,Object> topoConf)
- Specified by:
setupHeatbeats
in interfaceIStormClusterState
-
teardownHeartbeats
public void teardownHeartbeats(String stormId)
- Specified by:
teardownHeartbeats
in interfaceIStormClusterState
-
teardownTopologyErrors
public void teardownTopologyErrors(String stormId)
- Specified by:
teardownTopologyErrors
in interfaceIStormClusterState
-
getLeader
public NimbusInfo getLeader(Runnable callback)
Description copied from interface:IStormClusterState
Get leader info from state store, which was written when a master gains leadership.Caution: it can not be used for fencing and is only for informational purposes because we use ZK as our backend now, which could have a overdue info of nodes.
- Specified by:
getLeader
in interfaceIStormClusterState
- Parameters:
callback
- callback func- Returns:
NimbusInfo
-
backpressureTopologies
public List<String> backpressureTopologies()
Description copied from interface:IStormClusterState
Get backpressure topologies. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
backpressureTopologies
in interfaceIStormClusterState
-
heartbeatStorms
public List<String> heartbeatStorms()
- Specified by:
heartbeatStorms
in interfaceIStormClusterState
-
errorTopologies
public List<String> errorTopologies()
- Specified by:
errorTopologies
in interfaceIStormClusterState
-
setTopologyLogConfig
public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String,Object> topoConf)
- Specified by:
setTopologyLogConfig
in interfaceIStormClusterState
-
topologyLogConfig
public LogConfig topologyLogConfig(String stormId, Runnable cb)
- Specified by:
topologyLogConfig
in interfaceIStormClusterState
-
workerHeartbeat
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info)
- Specified by:
workerHeartbeat
in interfaceIStormClusterState
-
removeWorkerHeartbeat
public void removeWorkerHeartbeat(String stormId, String node, Long port)
- Specified by:
removeWorkerHeartbeat
in interfaceIStormClusterState
-
supervisorHeartbeat
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info)
- Specified by:
supervisorHeartbeat
in interfaceIStormClusterState
-
topologyBackpressure
public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback)
Check whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off. But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off. This will prevent the spouts from getting stuck indefinitely if something wrong happens.- Specified by:
topologyBackpressure
in interfaceIStormClusterState
- Parameters:
stormId
- The topology IdtimeoutMs
- How long until the backpressure znode is invalid.callback
- The callback function- Returns:
- True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise.
-
setupBackpressure
public void setupBackpressure(String stormId, Map<String,Object> topoConf)
Description copied from interface:IStormClusterState
Setup backpressure. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
setupBackpressure
in interfaceIStormClusterState
-
removeBackpressure
public void removeBackpressure(String stormId)
Description copied from interface:IStormClusterState
Remove backpressure. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
removeBackpressure
in interfaceIStormClusterState
-
removeWorkerBackpressure
public void removeWorkerBackpressure(String stormId, String node, Long port)
Description copied from interface:IStormClusterState
Remove worker backpressure. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
removeWorkerBackpressure
in interfaceIStormClusterState
-
activateStorm
public void activateStorm(String stormId, StormBase stormBase, Map<String,Object> topoConf)
- Specified by:
activateStorm
in interfaceIStormClusterState
-
updateStorm
public void updateStorm(String stormId, StormBase newElems)
To update this function due to APersistentMap/APersistentSet is clojure's structure.- Specified by:
updateStorm
in interfaceIStormClusterState
-
removeStormBase
public void removeStormBase(String stormId)
- Specified by:
removeStormBase
in interfaceIStormClusterState
-
setAssignment
public void setAssignment(String stormId, Assignment info, Map<String,Object> topoConf)
- Specified by:
setAssignment
in interfaceIStormClusterState
-
setupBlob
public void setupBlob(String key, NimbusInfo nimbusInfo, Integer versionInfo)
- Specified by:
setupBlob
in interfaceIStormClusterState
-
activeKeys
public List<String> activeKeys()
- Specified by:
activeKeys
in interfaceIStormClusterState
-
blobstore
public List<String> blobstore(Runnable callback)
- Specified by:
blobstore
in interfaceIStormClusterState
-
removeStorm
public void removeStorm(String stormId)
- Specified by:
removeStorm
in interfaceIStormClusterState
-
removeBlobstoreKey
public void removeBlobstoreKey(String blobKey)
- Specified by:
removeBlobstoreKey
in interfaceIStormClusterState
-
removeKeyVersion
public void removeKeyVersion(String blobKey)
- Specified by:
removeKeyVersion
in interfaceIStormClusterState
-
setupErrors
public void setupErrors(String stormId, Map<String,Object> topoConf)
- Specified by:
setupErrors
in interfaceIStormClusterState
-
reportError
public void reportError(String stormId, String componentId, String node, Long port, Throwable error)
- Specified by:
reportError
in interfaceIStormClusterState
-
errors
public List<ErrorInfo> errors(String stormId, String componentId)
- Specified by:
errors
in interfaceIStormClusterState
-
lastError
public ErrorInfo lastError(String stormId, String componentId)
- Specified by:
lastError
in interfaceIStormClusterState
-
setCredentials
public void setCredentials(String stormId, Credentials creds, Map<String,Object> topoConf)
- Specified by:
setCredentials
in interfaceIStormClusterState
-
credentials
public Credentials credentials(String stormId, Runnable callback)
- Specified by:
credentials
in interfaceIStormClusterState
-
disconnect
public void disconnect()
- Specified by:
disconnect
in interfaceIStormClusterState
-
getPrivateWorkerKey
public PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion)
Description copied from interface:IStormClusterState
Get a private key used to validate a token is correct. This is expected to be called from a privileged daemon, and the ACLs should be set up to only allow nimbus and these privileged daemons access to these private keys.- Specified by:
getPrivateWorkerKey
in interfaceIStormClusterState
- Parameters:
type
- the type of service the key is for.topologyId
- the topology id the key is for.keyVersion
- the version of the key this is for.- Returns:
- the private key or null if it could not be found.
-
addPrivateWorkerKey
public void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key)
Description copied from interface:IStormClusterState
Store a new version of a private key. This is expected to only ever be called from nimbus. All ACLs however need to be setup to allow the given services access to the stored information.- Specified by:
addPrivateWorkerKey
in interfaceIStormClusterState
- Parameters:
type
- the type of service this key is for.topologyId
- the topology this key is forkeyVersion
- the version of the key this is for.key
- the key to store.
-
getNextPrivateWorkerKeyVersion
public long getNextPrivateWorkerKeyVersion(WorkerTokenServiceType type, String topologyId)
Description copied from interface:IStormClusterState
Get the next key version number that should be used for this topology id. This is expected to only ever be called from nimbus, but it is acceptable if the ACLs are setup so that it can work from a privileged daemon for the given service.- Specified by:
getNextPrivateWorkerKeyVersion
in interfaceIStormClusterState
- Parameters:
type
- the type of service this is for.topologyId
- the topology id this is for.- Returns:
- the next version number. It should be 0 for a new topology id/service combination.
-
removeExpiredPrivateWorkerKeys
public void removeExpiredPrivateWorkerKeys(String topologyId)
Description copied from interface:IStormClusterState
Remove all keys for the given topology that have expired. The number of keys should be small enough that doing an exhaustive scan of them all is acceptable as there is no guarantee that expiration time and version number are related. This should be for all service types. This is expected to only ever be called from nimbus and some ACLs may be setup so being called from other daemons will cause it to fail.- Specified by:
removeExpiredPrivateWorkerKeys
in interfaceIStormClusterState
- Parameters:
topologyId
- the id of the topology to scan.
-
removeAllPrivateWorkerKeys
public void removeAllPrivateWorkerKeys(String topologyId)
Description copied from interface:IStormClusterState
Remove all of the worker keys for a given topology. Used to clean up after a topology finishes. This is expected to only ever be called from nimbus and ideally should only ever work from nimbus.- Specified by:
removeAllPrivateWorkerKeys
in interfaceIStormClusterState
- Parameters:
topologyId
- the topology to clean up after.
-
idsOfTopologiesWithPrivateWorkerKeys
public Set<String> idsOfTopologiesWithPrivateWorkerKeys()
Description copied from interface:IStormClusterState
Get a list of all topologyIds that currently have private worker keys stored, of any kind. This is expected to only ever be called from nimbus.- Specified by:
idsOfTopologiesWithPrivateWorkerKeys
in interfaceIStormClusterState
- Returns:
- the list of topology ids with any kind of private worker key stored.
-
-