Storm/Trident integration for MongoDB. This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.

Insert into Database

The bolt and trident state included in this package for inserting data into a database collection.

MongoMapper

The main API for inserting data in a collection using MongoDB is the org.apache.storm.mongodb.common.mapper.MongoMapper interface:

public interface MongoMapper extends Serializable {
    Document toDocument(ITuple tuple);
}

SimpleMongoMapper

storm-mongodb includes a general purpose MongoMapper implementation called SimpleMongoMapper that can map Storm tuple to a Database document. SimpleMongoMapper assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.

public class SimpleMongoMapper implements MongoMapper {
    private String[] fields;

    @Override
    public Document toDocument(ITuple tuple) {
        Document document = new Document();
        for(String field : fields){
            document.append(field, tuple.getValueByField(field));
        }
        return document;
    }

    public SimpleMongoMapper withFields(String... fields) {
        this.fields = fields;
        return this;
    }
}

MongoInsertBolt

To use the MongoInsertBolt, you construct an instance of it by specifying url, collectionName and a MongoMapper implementation that converts storm tuple to DB document. The following is the standard URI connection scheme: mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options

String url = "mongodb://127.0.0.1:27017/test";
String collectionName = "wordcount";

MongoMapper mapper = new SimpleMongoMapper()
        .withFields("word", "count");

MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);

MongoTridentState

We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the MongoMapper instance. See the example below:

        MongoMapper mapper = new SimpleMongoMapper()
                .withFields("word", "count");

        MongoState.Options options = new MongoState.Options()
                .withUrl(url)
                .withCollectionName(collectionName)
                .withMapper(mapper);

        StateFactory factory = new MongoStateFactory(options);

        TridentTopology topology = new TridentTopology();
        Stream stream = topology.newStream("spout1", spout);

        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());

NOTE:

If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.

Update from Database

The bolt included in this package for updating data from a database collection.

SimpleMongoUpdateMapper

storm-mongodb includes a general purpose MongoMapper implementation called SimpleMongoUpdateMapper that can map Storm tuple to a Database document. SimpleMongoUpdateMapper assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to. SimpleMongoUpdateMapper uses $set operator for setting the value of a field in a document. More information about update operator, you can visit https://docs.mongodb.org/manual/reference/operator/update/

public class SimpleMongoUpdateMapper implements MongoMapper {
    private String[] fields;

    @Override
    public Document toDocument(ITuple tuple) {
        Document document = new Document();
        for(String field : fields){
            document.append(field, tuple.getValueByField(field));
        }
        return new Document("$set", document);
    }

    public SimpleMongoUpdateMapper withFields(String... fields) {
        this.fields = fields;
        return this;
    }
}

QueryFilterCreator

The main API for creating a MongoDB query Filter is the org.apache.storm.mongodb.common.QueryFilterCreator interface:

public interface QueryFilterCreator extends Serializable {
    Bson createFilter(ITuple tuple);
}

SimpleQueryFilterCreator

storm-mongodb includes a general purpose QueryFilterCreator implementation called SimpleQueryFilterCreator that can create a MongoDB query Filter by given Tuple. QueryFilterCreator uses $eq operator for matching values that are equal to a specified value. More information about query operator, you can visit https://docs.mongodb.org/manual/reference/operator/query/

public class SimpleQueryFilterCreator implements QueryFilterCreator {
    private String field;
    
    @Override
    public Bson createFilter(ITuple tuple) {
        return Filters.eq(field, tuple.getValueByField(field));
    }

    public SimpleQueryFilterCreator withField(String field) {
        this.field = field;
        return this;
    }

}

MongoUpdateBolt

To use the MongoUpdateBolt, you construct an instance of it by specifying Mongo url, collectionName, a QueryFilterCreator implementation and a MongoMapper implementation that converts storm tuple to DB document.

        MongoMapper mapper = new SimpleMongoUpdateMapper()
                .withFields("word", "count");

        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
                .withField("word");
        
        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);

        //if a new document should be inserted if there are no matches to the query filter
        //updateBolt.withUpsert(true);

Or use a anonymous inner class implementation for QueryFilterCreator:

        MongoMapper mapper = new SimpleMongoUpdateMapper()
                .withFields("word", "count");

        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
            @Override
            public Bson createFilter(ITuple tuple) {
                return Filters.gt("count", 3);
            }
        };
        
        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);

        //if a new document should be inserted if there are no matches to the query filter
        //updateBolt.withUpsert(true);