Reprint please indicate the source :http://blog.csdn.net/lonelytrooper/article/details/12434915

Chapter viii.   transactional Topologies

stay Storm in , As mentioned earlier in this book , You can use ack and fail Policy to ensure message processing . But what happens if the tuple is replayed ? How do you make sure you don't count too much ?

transactional Topologies Is contained in Storm0.7.0 What's new in the release , It activates message semantics to ensure that you replay tuples in a secure way and that they are processed only once . It's not transactional topologies Support for , You can't be completely accurate 、 Scalable and fault tolerant ways to count .

transactional Topologies It's about establishing standards Storm spout and bolts An abstraction above .

Design

In transactional topology in ,Storm Using a hybrid of parallel and sequential tuple processing .Spout The resulting batch tuples are bolts Parallel processing . these bolts Some of them are considered to be committers , They submit processed bulk tuples in some sort of strict sort . That means if you have two batches , Each batch contains five tuples , The tuples on both sides will be bolts To process in parallel , But the author bolts The second tuple will not be submitted until the first tuple is submitted successfully .

When dealing with transactional Topology when , Batch tuples can be replayed from the source , Sometimes even multiple replays are very important . So make sure your data source -- Yours spout The one that's going to connect -- Have this ability .

This can be described as two different steps , Or stage :

Processing stage

A completely parallel phase , Many batches are executed at the same time .

Submission phase

The strictly ordered phase , The second batch is not submitted until the first batch is submitted successfully .

Call these two stages Storm Business .

storm Use zookeeper To save transaction metadata . By default, it is topology The one who serves zookeeper To save metadata . You can override the configuration key transactional.zookeeper.

servers and transactional.zookeeper.port. To change the .

Real business

To see how things work , You're going to build a Twitter Analysis tools . You'll read the data stored in Redis Medium tweets, Through a series of bolts Deal with them , Then store -- In another Redis In the database -- All the labels and they're in tweets A list of frequencies in , All users and their presence in tweets And a list of users and their tags and frequencies .

The tool you're going to create topology From the figure 8-1 describe .

chart 8-1 Topology View

As you can see ,TweetsTransactionalSpout It'll connect to you tweets Database and in topology The tuples in the batch are emitted . Two different bolts,UserSplitterBolt and HashtagSplitterBolt, Will be taken from spout Receive tuples .UserSplitterBolt Can analyze tweet And find users --@ The following words -- And send these words, one called users Custom stream for .HashatagSplitterBolt Also analyzed tweet, lookup # Old words , And send these words to a place called hashtags Custom stream for . Third bolt,UserHashtagJoinBolt, It will receive two streams and compute in a named user's tweet In a hashtag How many times has it appeared . To count and send results , The bolt Would be BaseBatchBolt( I'll tell you more later ).

Final , The last one is called RedisCommitterBolt Of bolt, Take these three -- from UserSplitterBolt, HashtagSplitterBolt and UserHashtagJoinBolt The resulting flow . In the same thing , It does all the counting , And once the tuple batch is processed, it will be sent to Redis. The bolt It's considered a special kind of bolt, It's called the committer bolt, It will be explained in the following chapters .

To build this topology, Use TransactionalTopologyBuilder, Similar to the following code block :

TransactionalTopologyBuilder builder =

newTransactionalTopologyBuilder("test","spout",newTweetsTransactionalSpout());

).shuffleGrouping("spout");

builder.setBolt("hashtag-splitter",

).shuffleGrouping("spout");

)

.fieldsGrouping("users-splitter","users", newFields("tweet_id"))

.fieldsGrouping("hashtag-splitter", "hashtags", newFields("tweet_id"));

builder.setBolt("redis-committer", newRedisCommiterCommiterBolt())

.globalGrouping("users-splitter","users")

.globalGrouping("hashtag-splitter", "hashtags")

.globalGrouping("user-hashtag-merger");

Let's take a look at how to do business topology To realize spout.

The Spout

Business topology Medium spout And standard spout Completely different .

public class TweetsTransactionalSpoutextends

BaseTransactionalSpout<TransactionMetadata> {

As you can see in the class definition ,TweetsTransactionalSpout Inherited from BaseTransactionalSpout With a generic type . The type you set here is considered transaction metadata . It will be used later when sending bulk tuples from the source .

In this case ,TransactionMetadata Is defined as :

public class TransactionMetadataimplementsSerializable{

private static final longserialVersionUID=1L;

long from;

int quantity;

public TransactionMetadata(longfrom,intquantity) {

this.from=from;

this.quantity=quantity;

}

}

Here you store from and quantity, They will tell you exactly how to generate a batch of tuples .

In order to complete spout The implementation of the , You need to do the following :

@Override

publicITransactionalSpout.Coordinator<TransactionMetadata>getCoordinator(

Map conf,TopologyContextcontext) {

returnnew TweetsTransactionalSpoutCoordinator();

}

@Override

publicbacktype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata>

getEmitter(

Map conf,TopologyContextcontext) {

returnnew TweetsTransactionalSpoutEmitter();

}

@Override

publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(newFields("txid","tweet_id","tweet"));

}

stay getCoordinator In the method , You tell me Storm Which class will coordinate the generation of batch tuples . stay getEmitter in , You tell me Storm Which class is responsible for reading bulk tuples from the source and sending them to topology In a stream of . Last , As you did before , You need to declare which domains are launched .

The RQ class

To make the example simpler , We decided to encapsulate all the Redis The related operations are in a class .

public class RQ{

public static final String NEXT_READ="NEXT_READ";

public static final String NEXT_WRITE="NEXT_WRITE";

Jedis jedis;

public RQ() {

jedis =newJedis("localhost");

}

public longgetAvailableToRead(longcurrent) {

return getNextWrite() -current;

}

publiclonggetNextRead() {

String sNextRead =jedis.get(NEXT_READ);

if(sNextRead==null)

;

returnLong.valueOf(sNextRead);

}

publiclonggetNextWrite() {

returnLong.valueOf(jedis.get(NEXT_WRITE));

}

publicvoidclose() {

jedis.disconnect();

}

publicvoidsetNextRead(longnextRead) {

jedis.set(NEXT_READ,""+nextRead);

}

publicList<String>getMessages(longfrom,intquantity) {

String[]keys=newString[quantity];

;i<quantity;i++)

keys[i] =""+(i+from);

returnjedis.mget(keys);

}

}

Read the implementation of each method carefully , Make sure you understand what they're doing .

The coordinator

Let's take a look at the coordinator implementation in this example .

public static class TweetsTransactionalSpoutCoordinatorimplements

ITransactionalSpout.Coordinator<TransactionMetadata> {

TransactionMetadata lastTransactionMetadata;

RQ rq =newRQ();

;

public TweetsTransactionalSpoutCoordinator() {

nextRead =rq.getNextRead();

}

@Override

public TransactionMetadatainitializeTransaction(BigInteger txid,

TransactionMetadata prevMetadata) {

long quantity=rq.getAvailableToRead(nextRead);

quantity =quantity>MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE:quantity;

TransactionMetadata ret =newTransactionMetadata(nextRead, (int)quantity);

nextRead +=quantity;

returnret;

}

@Override

publicbooleanisReady() {

;

}

@Override

publicvoidclose() {

rq.close();

}

}

It's important to be reminded that throughout topology There is only one coordinator instance in . When the coordinator is initialized , It is from Redis Retrieve a sequence , This timing tells the coordinator what to read next tweet Which one is it . The first time , The value is 1, It means the next thing to read tweet It's the first one .

The first method to be called is isReady. It's in initializeTransaction It is always called before to make sure that the source is ready to be read . You have to go back accordingly true perhaps false. In this case , retrieval tweets And compare them with what you read tweets Compare the quantity . The difference between them is readable tweets Number . If it is greater than 0, That means you still have tweets Can be read .

Last ,initializeTransaction Be performed . As you can see , Do you use txid and prevMetadata As a parameter . The first parameter is a function of Storm The only transaction generated ID, It represents the batch of generated tuples .prevMetadata Metadata generated by the coordinator of the previous transaction .

In this case , First, make sure how many tweets Readable . Once you've sorted out , Create a new TransactionMetadata, Indicate which is the first to read tweet, How much to read .

As soon as you return metadata ,Storm Just combine it with txid Deposit in zookeeper in . This ensures that in the event of a mistake ,Storm Has the ability to make the emitter resend tuples .

The emitter

Create transaction spout The final step of the project is to implement the launcher .

Let's start with the implementation below :

public static class TweetsTransactionalSpoutEmitterimplements

ITransactionalSpout.Emitter<TransactionMetadata> {

RQ rq =newRQ();

public TweetsTransactionalSpoutEmitter() {

}

@Override

publicvoidemitBatch(TransactionAttempt tx,

TransactionMetadata coordinatorMeta,BatchOutputCollector collector) {

rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);

List<String>messages=rq.getMessages(coordinatorMeta.from,

coordinatorMeta.quantity);

longtweetId=coordinatorMeta.from;

for(String message:messages) {

collector.emit(newValues(tx,""+tweetId,message));

tweetId++;

}

}

@Override

publicvoidcleanupBefore(BigInteger txid) {

}

@Override

publicvoidclose() {

rq.close();

}

}

The emitter reads the source and emits tuples into a stream . For the same transaction id and transaction metadata, It's important that emitters can always emit tuples of the same batch . such , If there is an error in processing a batch ,Storm It's possible to replay the same transaction id and transaction metadata And make sure the batch is replayed .Storm Will increase TransactionAttempt Medium attempt id. So you can know that the batch has been replayed .

here emitBatch It's an important way . In this method , Using metadata as a parameter , from Redis Read from tweets. At the same time increase in Redis Sequence in , This sequence records how much you have read so far tweets. Of course , And launch tweets To topology.

getting start with storm translate Chapter viii. part-1 More articles about

  1. getting start with storm translate Chapter viii. part-2

    Reprint please indicate the source :http://blog.csdn.net/lonelytrooper/article/details/12435641 The Bolts First of all, let's take a look at the topology The standard bo ...

  2. Big data day 16 —— Flow computing is storm Detailed explanation ( One ) Getting started with cluster installation

    One . summary Today, we are officially in the flow computing . Let's first explain the concept of streaming computing Offline computing Offline computing : Get data in bulk . Bulk data transfer . Periodic batch calculation data . Data presentation For technology :Sqoop Bulk import data .HDFS Mass storage of data ...

  3. The Django Book

    The Django Book Table of contents 2.0, English -> Chinese Django book 2.0 Chinese translation of . Recent updates  -  contributor For your own convenience ...

  4. In depth understanding of Magento – Chapter four – Models and ORM Basics

    In depth understanding of Magento author :Alan Storm  translate :Hailong Zhang Chapter four – Models and ORM Basics For any one MVC framework , Model (Model) Layer implementation takes up a large part . about Mage ...

  5. In depth understanding of Magento – The third chapter – Layout , Blocks and templates

    In depth understanding of Magento author :Alan Storm  translate :Hailong Zhang The third chapter – Layout , Blocks and templates We went on to study Magento. According to our second chapter Magento MVC The architecture of , Let's go on ...

  6. In depth understanding of Magento – Chapter two – Magento Request distribution and controller

    In depth understanding of Magento author :Alan Storm  translate :Hailong Zhang Chapter two – Magento Request distribution and controller Model-View-Controller (MVC) , Model - View - ...

  7. In depth understanding of Magento - Chapter one - Magento Powerful configuration system

    In depth understanding of Magento author :Alan Storm translate :zhlmmc Chapter one of the preface - Magento Powerful configuration system Chapter 2 - Magento Request distribution and controller Chapter 3 - Layout , Blocks and templates Chapter 4 - Models and ...

  8. 《Entity Framework 6 Recipes》 Chinese Translation Series (42) ------ Chapter viii. POCO The use of POCO

    The original intention of translation and why <Entity Framework 6 Recipes> To learn , Look at the beginning of this series Chapter viii. POCO Objects should not know how to save them , Load them or filter them . This is familiar in software development ...

  9. 《Entity Framework 6 Recipes》 Chinese Translation Series (46) ------ Chapter viii. POCO Domain object testing and warehousing testing

    The original intention of translation and why <Entity Framework 6 Recipes> To learn , Look at the beginning of this series 8-8   Test domain objects problem You want to create unit tests for domain objects . This is mainly for , Test specific numbers ...

Random recommendation

  1. To configure LBaaS - Every day 5 Minutes to play OpenStack(121)

    I learned the last section Neutron LBaaS Principle , Start practicing today . First, enable... In the configuration LBaaS service . Neutron adopt lbaas plugin and lbaas agent Provide LBaaS service ...

  2. Game Programming Pattern

    http://gameprogrammingpatterns.com/contents.html

  3. bean label

     bean Of uri The path of bean The label belongs to struts The label in , Use in Struts 1.3 Libraries in struts-taglib-1.3.8.jar in META-INFtld ...

  4. SET GLOBAL long_query_time=0

    SHOW VARIABLES LIKE '%long%'

  5. Windows Azure - App Services

    1. Concepts that need to be understood :App Service Plan, Resource Group 2. Create an ASP.NET web app in Azure App Services 3. Cr ...

  6. Linux mkisofs Create a CD image file (Linux Instruction learning notes )

    mkisofs command The command of the system that creates the CD file is mkisofs. There are many formats in the optical disk system , utilize Linux The CD file system provided by the system is created command mkisofs, You can create a variety of iso9660 file system . We usually don't mki ...

  7. Li Hongqiang talks about iOS Development [C Language -011] - C Language identifier

    /** *    Identifier 2016 year 7 month 14 Japan 01 The biggest difference between low-level and high-level languages : Low level languages use machine instructions High level language is to write some code that people can understand - assembly language Identifier It's the name Naming rules : 1) only ...

  8. From blue light to 4K, The technology behind Tencent's high bit rate video download

    Welcome to cloud + Community , Get more Tencent mass technology practice dry goods ~ from   Tencent Technology Engineering official No   Publish in the cloud + Community Blu ray and 4k Video is becoming more and more popular ,4K The peak bit rate of video exceeds 10Mbit/s. Architecture Platform Department TVideo The platform starts from resources , link . slow ...

  9. App.config Custom node read

    <?xml version="1.0" encoding="utf-8"?> <configuration> <!--<ot ...

  10. 【C++/ function 】 experiment 2

    1. Experimental content Function declaration and function definition : Formal parameters and actual parameters :  The main function transfers the value of the real parameter to the formal parameter of the called function, so as to realize the data transfer from the main function to the called function . Shape parameter : Formal parameters , Represents the connection between the main function and the called function . Identifies where the formal parameter appears ...