Real-time Analytics with Storm and Cassandra
上QQ阅读APP看书,第一时间看更新

Executing the topology in the distributed mode

To set up Storm in distributed mode, we will need to perform the following steps.

Set up Zookeeper (V 3.3.5) for Storm

The coordination of a Storm topology is maintained by a Zookeeper cluster. The utilization of Zookeeper is not very high, as it just maintains the runnable state of the Storm cluster. In most cases, a single Zookeeper node should suffice, but in production scenarios, at least a three-node Zookeeper cluster is recommended so that a single node doesn't become a single point of failure.

For reliable Zookeeper service, deploy Zookeeper in a cluster known as an ensemble. As long as the majority of the ensemble is up, the service will be available. One of the nodes in the ensemble is automatically selected as a leader and others as followers. If the leader goes down, one of the follower nodes becomes the leader.

Perform the following steps on all the machines that will be part of the Zookeeper ensemble to set up the Zookeeper cluster:

  1. Download the most recent stable release (version 3.3.5) from the Apache Zookeeper site.
  2. Create a zookeeper directory under /usr/local:
    sudo mkdir /usr/local/zookeeper
    
  3. Extract the downloaded TAR file to the /usr/local location. Use the following command:
    sudo tar -xvf zookeeper-3.3.5.tar.gz -C /usr/local/zookeeper
    
  4. Zookeeper needs a directory to store its data. Create /usr/local/zookeeper/tmp to store this data:
    sudo mkdir –p /usr/local/zookeeper/tmp
    
  5. Create a configuration file, zoo.cfg, under /usr/local/zookeeper/zookeeper-3.3.5/conf. The following properties will go in it:
    • tickTime: This is the number of milliseconds of each tick (for example, 2000).
    • initLimit: This is the number of ticks that the initial synchronization phase can take (for example, 5).
    • syncLimit: This is the number of ticks that can pass between sending a request and getting an acknowledgement (for example, 2).
    • dataDir: This is the directory where the snapshot is stored (for example, /usr/local/zookeeper/tmp).
    • clientPort: This is the port at which the Zookeeper clients will connect to the port (for example, 2182).
    • server.id=host:port:port: Every machine that is part of the Zookeeper ensemble should know about every other machine in the ensemble. This is accomplished with the series of lines of the server.id=host:port:port form (for example, server.1:<IP_ADDRESS_OF_ZOOKEEPER_NODE_1>:2888:3888).
  6. Repeat the preceding steps or copy the distribution to other machines that will be part of the Zookeeper cluster.
  7. Create a file with the name myid in the directory specified by the datadir property. The myid file consists of a single line containing only the text of that machine's ID (1 in the server and 1 in zoo.cfg). So, myid of server 1 will contain the text 1 and nothing else. The ID must be unique within the ensemble and should have a value between 1 and 255. The path of the myid file in this case is vi /usr/local/zookeeper/tmp/myid.
  8. Edit the ~/.bashrc file and add an environment variable for the Zookeeper home and add its bin directory to the PATH environment variable:
    Set up Zookeeper (V 3.3.5) for Storm
  9. Source the ~/.bashrc file after making changes. This step is required to make sure that the changes that are made to bashrc are applied to the current terminal session:
    source ~/.bashrc
    
  10. Start the Zookeeper daemon on each node by executing the following command from $ZOOKEEPER_HOME:
    sudo –E bin/zkServer.sh start
    
  11. Stop the Zookeeper daemon on each node by executing the following command from $ZOOKEEPER_HOME:
    sudo –E bin/zkServer.sh stop
    
  12. The Zookeeper status can be checked by running the following command from $ZOOKEEPER_HOME:
    sudo –E bin/zkServer.sh status
    

The output for the different modes is as follows:

  • If running in the standalone mode (only a single machine is part of the Zookeeper ensemble cluster), the following output will be seen on the console:
    Set up Zookeeper (V 3.3.5) for Storm
  • If running in the clustered mode, the following output is seen on the leader node:
    Set up Zookeeper (V 3.3.5) for Storm
  • If running in the clustered mode, the following output is seen on the follower node:
    Set up Zookeeper (V 3.3.5) for Storm

By default, the Zookeeper log (zookeeper.out) is created at the same location from where its instance is started. This completes the Zookeeper cluster setup.

Setting up Storm in the distributed mode

Perform the following steps to set up Storm in distributed mode:

  1. Download the Storm-0.9.2-incubating.zip package from the GitHub Storm site.
  2. Create the directories storm and storm/tmp under /usr/local:
    sudo mkdir –p /usr/local/storm/tmp
    
  3. Create the following directories for logs:
    sudo mkdir –p /mnt/abc_logs/storm/storm_logs
    
  4. Extract the ZIP file on Nimbus and the worker machines from the directory at /usr/local:
    sudo unzip -d /usr/local/storm/ storm-0.9.2 -incubating.zip
    
  5. Make the following changes at /usr/local/storm/storm-0.9.2-incubating/conf/storm.yaml:
    • storm.zookeeper.servers: This is a list of the hosts in the Zookeeper cluster for the Storm cluster:
      storm.zookeeper.servers:
       "<IP_ADDRESS_OF_ZOOKEEPER_ENSEMBLE_NODE_1>"
       "<IP_ADDRESS_OF_ZOOKEEPER_ENSEMBLE_NODE_2>"
    • storm.zookeeper.port: This is the port on which the Zookeeper cluster is running:
      storm.zookeeper.port: 2182
    • storm.local.dir: The Nimbus and the supervisor require a location on the local disk to store a small amount of data related to configurations and execution details of the topology. Please make sure to create the directory and assign read/write permissions on all Storm nodes. For our installation, we are going to create this directory in the /usr/local/storm/tmp location:
      storm.local.dir: "/usr/local/storm/tmp"
    • nimbus.host: The nodes need to know which machine is the master in order to download topology jars and confs. This property is used for this purpose:
      nimbus.host: "<IP_ADDRESS_OF_NIMBUS_HOST>"
    • java.library.path: This is the load path for the native libraries that Storm uses (ZeroMQ and JZMQ). The default of /usr/local/lib:/opt/local/lib:/usr/lib should be fine for most installations, so validate the libraries in the previously mentioned locations before moving forward.
    • storm.messaging.netty: Storm's Netty-based transport has been overhauled to significantly improve performance through better utilization of thread, CPU, and network resources, particularly in cases where message sizes are small. In order to provide Netty support, the following configurations need to be added:
      storm.messaging.transport:"backtype.storm.messaging.netty.Context"
                 storm.messaging.netty.server_worker_threads:1
                 storm.messaging.netty.client_worker_threads:1
                 storm.messaging.netty.buffer_size:5242880
                 storm.messaging.netty.max_retries:100
                 storm.messaging.netty.max_wait_ms:1000
                 storm.messaging.netty.min_wait_ms:100
    • The storm.yaml snippet from our Storm cluster installation is as follows:
      #To be filled in for a storm configuration
      storm.zookeeper.servers:
           - "nim-zkp-flm-3.abc.net"
      storm.zookeeper.port: 2182
      storm.local.dir: "/usr/local/storm/tmp"
      nimbus.host: "nim-zkp-flm-3.abc.net"
      topology.message.timeout.secs: 60
      topology.debug: false
      topology.optimize: true
      topology.ackers: 4
      
      storm.messaging.transport: "backtype.storm.messaging.netty.Context"
      storm.messaging.netty.server_worker_threads: 1
      storm.messaging.netty.client_worker_threads: 1
      storm.messaging.netty.buffer_size: 5242880
      storm.messaging.netty.max_retries: 100
      storm.messaging.netty.max_wait_ms: 1000
      storm.messaging.netty.min_wait_ms: 100
  6. Set the STORM_HOME environment in the ~/.bashrc file and add Storm's bin directory in the PATH environment variable. This is added to execute Storm binaries from any location.
  7. Copy the Storm.yaml file to the bin folder of the Storm installation on the Nimbus machine using the following command:
    sudo cp /usr/local/storm/storm-0.9.2- incubating/conf/storm.yaml /usr/local/storm/storm-0.8.2/bin/
    

Launching Storm daemons

Now that the Storm cluster is set, we will be required to start three processes on respective Storm nodes. They are as follows:

  • Nimbus: Start Nimbus as the background process on the machine identified as the master node by running the following command from $STORM_HOME:
    sudo –bE bin/storm nimbus
    
  • Supervisor: Supervisors can be started in a similar way Nimbus is started. Run the following command from $STORM_HOME:
    sudo –bE bin/storm supervisor
    
  • UI: The Storm UI is a web application to check the Storm cluster, which contains the Nimbus/Supervisor status. It also lists all the running topologies and their details. The UI can be enabled by using the following command from $STORM_HOME:
    sudo –bE bin/storm ui
    

The UI can be accessed through http://<IP_ADDRESS_OF_NIMBUS>:8080.