Class JmsSpout
- java.lang.Object
-
- org.apache.storm.topology.base.BaseComponent
-
- org.apache.storm.topology.base.BaseRichSpout
-
- org.apache.storm.jms.spout.JmsSpout
-
- All Implemented Interfaces:
Serializable
,ISpout
,IComponent
,IRichSpout
public class JmsSpout extends BaseRichSpout
A StormSpout
implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.JmsSpout
instances rely onJmsProducer
implementations to obtain the JMSConnectionFactory
andDestination
objects necessary to connect to a JMS topic/queue.When a
JmsSpout
receives a JMS message, it delegates to an internalJmsTupleProducer
instance to create a Storm tuple from the incoming message.Typically, developers will supply a custom
JmsTupleProducer
implementation appropriate for the expected message content.- See Also:
- Serialized Form
-
-
Constructor Summary
Constructors Constructor Description JmsSpout()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
ack(Object msgId)
Ack a successfully handled message by the matchingJmsMessageID
.void
close()
Close thesession
andconnection
.void
declareOutputFields(OutputFieldsDeclarer declarer)
Use thetupleProducer
to determine which fields are about to be emitted.void
fail(Object msgId)
Fail an unsuccessfully handled message by itsJmsMessageID
.Map<String,Object>
getComponentConfiguration()
Declare configuration specific to this component.int
getJmsAcknowledgeMode()
Returns the JMS Session acknowledgement mode for the JMS session associated with this spout.protected javax.jms.Session
getSession()
Returns the currently active session.boolean
isDistributed()
Returns if the spout is distributed.void
nextTuple()
Generate the next tuple from a message.void
open(Map<String,Object> conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector)
ISpout
implementation.void
setDistributed(boolean isDistributed)
Sets the "distributed" mode of this spout.void
setIndividualAcks()
Set if JMS vendor supports ack-ing individual messages.void
setJmsAcknowledgeMode(int mode)
Sets the JMS Session acknowledgement mode for the JMS session.void
setJmsProvider(JmsProvider provider)
SetjmsProvider
.void
setJmsTupleProducer(JmsTupleProducer producer)
Set theJmsTupleProducer
implementation that will convertjavax.jms.Message
object toorg.apache.storm.tuple.Values
objects to be emitted.-
Methods inherited from class org.apache.storm.topology.base.BaseRichSpout
activate, deactivate
-
-
-
-
Method Detail
-
setJmsAcknowledgeMode
public void setJmsAcknowledgeMode(int mode)
Sets the JMS Session acknowledgement mode for the JMS session.Possible values:
- javax.jms.Session.AUTO_ACKNOWLEDGE
- javax.jms.Session.CLIENT_ACKNOWLEDGE
- javax.jms.Session.DUPS_OK_ACKNOWLEDGE
Any other vendor specific modes are not supported.
- Parameters:
mode
- JMS Session Acknowledgement mode
-
getJmsAcknowledgeMode
public int getJmsAcknowledgeMode()
Returns the JMS Session acknowledgement mode for the JMS session associated with this spout. Can be either of:Session.AUTO_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE
Session.DUPS_OK_ACKNOWLEDGE
Session.SESSION_TRANSACTED
- Returns:
- the int value of the acknowledgment mode.
-
setJmsProvider
public void setJmsProvider(JmsProvider provider)
SetjmsProvider
.Set the
JmsProvider
implementation that this Spout will use to connect to a JMSjavax.jms.Desination
- Parameters:
provider
- the provider to use
-
setJmsTupleProducer
public void setJmsTupleProducer(JmsTupleProducer producer)
Set theJmsTupleProducer
implementation that will convertjavax.jms.Message
object toorg.apache.storm.tuple.Values
objects to be emitted.- Parameters:
producer
- the producer instance to use
-
setIndividualAcks
public void setIndividualAcks()
Set if JMS vendor supports ack-ing individual messages. The appropriate mode must be set via {setJmsAcknowledgeMode(int)
}.
-
open
public void open(Map<String,Object> conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector)
ISpout
implementation.Connects the JMS spout to the configured JMS destination topic/queue.
- Parameters:
conf
- The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.context
- This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.spoutOutputCollector
- The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
-
close
public void close()
Close thesession
andconnection
.When overridden, should always call
super
to finalize the active connections.- Specified by:
close
in interfaceISpout
- Overrides:
close
in classBaseRichSpout
-
nextTuple
public void nextTuple()
Generate the next tuple from a message.This method polls the queue that's being filled asynchronously by the jms connection, every
POLL_INTERVAL_MS
seconds.
-
ack
public void ack(Object msgId)
Ack a successfully handled message by the matchingJmsMessageID
.Acking means removing the message from the pending messages collections, and if it was the oldest pending message - ack it to the mq as well, so that it's the only one acked.
Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.
- Specified by:
ack
in interfaceISpout
- Overrides:
ack
in classBaseRichSpout
-
fail
public void fail(Object msgId)
Fail an unsuccessfully handled message by itsJmsMessageID
.Failing means dropping all pending messages and queueing a recovery attempt.
Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
- Specified by:
fail
in interfaceISpout
- Overrides:
fail
in classBaseRichSpout
-
declareOutputFields
public void declareOutputFields(OutputFieldsDeclarer declarer)
Use thetupleProducer
to determine which fields are about to be emitted.Note that
nextTuple()
always emits to the default stream, and thus only fields declared for this stream are used.- Parameters:
declarer
- this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
-
isDistributed
public boolean isDistributed()
Returns if the spout is distributed.- Returns:
distributed
.
-
setDistributed
public void setDistributed(boolean isDistributed)
Sets the "distributed" mode of this spout.If
true
multiple instances of this spout may be created across the cluster (depending on the "parallelism_hint" in the topology configuration).Setting this value to
false
essentially means this spout will run as a singleton within the cluster ("parallelism_hint" will be ignored).In general, this should be set to
false
if the underlying JMS destination is a topic, andtrue
if it is a JMS queue.- Parameters:
isDistributed
-true
if should be distributed,false
otherwise.
-
getSession
protected javax.jms.Session getSession()
Returns the currently active session.- Returns:
- The currently active session
-
getComponentConfiguration
public Map<String,Object> getComponentConfiguration()
Description copied from interface:IComponent
Declare configuration specific to this component. Only a subset of the "topology.*" configs can be overridden. The component configuration can be further overridden when constructing the topology usingTopologyBuilder
- Specified by:
getComponentConfiguration
in interfaceIComponent
- Overrides:
getComponentConfiguration
in classBaseComponent
-
-