Class JmsSpout

  • All Implemented Interfaces:
    Serializable, ISpout, IComponent, IRichSpout

    public class JmsSpout
    extends BaseRichSpout
    A Storm Spout implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.

    JmsSpout instances rely on JmsProducer implementations to obtain the JMS ConnectionFactory and Destination objects necessary to connect to a JMS topic/queue.

    When a JmsSpout receives a JMS message, it delegates to an internal JmsTupleProducer instance to create a Storm tuple from the incoming message.

    Typically, developers will supply a custom JmsTupleProducer implementation appropriate for the expected message content.

    See Also:
    Serialized Form
    • Constructor Detail

      • JmsSpout

        public JmsSpout()
    • Method Detail

      • setJmsAcknowledgeMode

        public void setJmsAcknowledgeMode​(int mode)
        Sets the JMS Session acknowledgement mode for the JMS session.

        Possible values:

        • javax.jms.Session.AUTO_ACKNOWLEDGE
        • javax.jms.Session.CLIENT_ACKNOWLEDGE
        • javax.jms.Session.DUPS_OK_ACKNOWLEDGE

        Any other vendor specific modes are not supported.

        Parameters:
        mode - JMS Session Acknowledgement mode
      • getJmsAcknowledgeMode

        public int getJmsAcknowledgeMode()
        Returns the JMS Session acknowledgement mode for the JMS session associated with this spout. Can be either of:
        • Session.AUTO_ACKNOWLEDGE
        • Session.CLIENT_ACKNOWLEDGE
        • Session.DUPS_OK_ACKNOWLEDGE
        • Session.SESSION_TRANSACTED
        Returns:
        the int value of the acknowledgment mode.
      • setJmsProvider

        public void setJmsProvider​(JmsProvider provider)
        Set jmsProvider.

        Set the JmsProvider implementation that this Spout will use to connect to a JMS javax.jms.Desination

        Parameters:
        provider - the provider to use
      • setJmsTupleProducer

        public void setJmsTupleProducer​(JmsTupleProducer producer)
        Set the JmsTupleProducer implementation that will convert javax.jms.Message object to org.apache.storm.tuple.Values objects to be emitted.
        Parameters:
        producer - the producer instance to use
      • setIndividualAcks

        public void setIndividualAcks()
        Set if JMS vendor supports ack-ing individual messages. The appropriate mode must be set via {setJmsAcknowledgeMode(int)}.
      • open

        public void open​(Map<String,​Object> conf,
                         TopologyContext context,
                         SpoutOutputCollector spoutOutputCollector)
        ISpout implementation.

        Connects the JMS spout to the configured JMS destination topic/queue.

        Parameters:
        conf - The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
        context - This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
        spoutOutputCollector - The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
      • close

        public void close()
        Close the session and connection.

        When overridden, should always call super to finalize the active connections.

        Specified by:
        close in interface ISpout
        Overrides:
        close in class BaseRichSpout
      • nextTuple

        public void nextTuple()
        Generate the next tuple from a message.

        This method polls the queue that's being filled asynchronously by the jms connection, every POLL_INTERVAL_MS seconds.

      • ack

        public void ack​(Object msgId)
        Ack a successfully handled message by the matching JmsMessageID.

        Acking means removing the message from the pending messages collections, and if it was the oldest pending message - ack it to the mq as well, so that it's the only one acked.

        Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.

        Specified by:
        ack in interface ISpout
        Overrides:
        ack in class BaseRichSpout
      • fail

        public void fail​(Object msgId)
        Fail an unsuccessfully handled message by its JmsMessageID.

        Failing means dropping all pending messages and queueing a recovery attempt.

        Will only be called if we're transactional or not AUTO_ACKNOWLEDGE

        Specified by:
        fail in interface ISpout
        Overrides:
        fail in class BaseRichSpout
      • declareOutputFields

        public void declareOutputFields​(OutputFieldsDeclarer declarer)
        Use the tupleProducer to determine which fields are about to be emitted.

        Note that nextTuple() always emits to the default stream, and thus only fields declared for this stream are used.

        Parameters:
        declarer - this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
      • isDistributed

        public boolean isDistributed()
        Returns if the spout is distributed.
        Returns:
        distributed.
      • setDistributed

        public void setDistributed​(boolean isDistributed)
        Sets the "distributed" mode of this spout.

        If true multiple instances of this spout may be created across the cluster (depending on the "parallelism_hint" in the topology configuration).

        Setting this value to false essentially means this spout will run as a singleton within the cluster ("parallelism_hint" will be ignored).

        In general, this should be set to false if the underlying JMS destination is a topic, and true if it is a JMS queue.

        Parameters:
        isDistributed - true if should be distributed, false otherwise.
      • getSession

        protected javax.jms.Session getSession()
        Returns the currently active session.
        Returns:
        The currently active session