本地模式运行
本地模式类似storm集群是一个进程,用来编写和测试topology。在本地模式上运行topology类似在一个集群上运行topology。创建一个本地集群: import backtype.storm.LocalCluster; LocalCluster cluster = new LocalCluster();提交集群使用submitTopology,杀死集群使用killTopology关闭一个本地集群使用cluster.shutdown(); 本地模式下的公共配置: Config.TOPOLOGY_MAX_TASK_PARALLELISM Config.TOPOLOGY_DEBUG:生产集群运行
一、步骤
1、定义topology,若用java语言,使用TopologyBuilder来定义2、使用StormSubmitter来提交topology到集群中,所需参数需要topology名字,topology的参数配置,topology本身例:Config conf = new Config();conf.setNumWorkers(20);conf.setMaxSpoutPending(5000);StormSubmitter.submitTopology("mytopology", conf, topology);3、创建jar包使用storm 客户端命令,jar包包含你的所有的代码4、提交topology,为jar包指定参数storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3实例:1 public static void main(String[] args) throws Exception {
2 TopologyBuilder builder = new TopologyBuilder();3 builder.setSpout("random", new RandomWordSpout(), 2);4 builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");5 builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));6 Config conf = new Config();7 conf.setNumWorkers(4);// 设置启动4个Worker8 conf.setNumAckers(1); // 设置一个ack线程9 conf.setDebug(true); // 设置打印所有发送的消息及系统消息10 StormSubmitter.submitTopology("test", conf, builder.createTopology());11 }二、公共配置
Config.TOPOLOGY_WORKERS :设置执行topology的worker的数量Config.TOPOLOGY_ACKER_EXECUTORS:Config.TOPOLOGY_MAX_SPOUT_PENDINGConfig.TOPOLOGY_MESSAGE_TIMEOUT_SECS:默认是30sConfig.TOPOLOGY_SERIALIZATIONS三、杀死topology
storm kill stormname四、更新一个正在运行的topology
杀死一个正在运行的topology,提交一个新的topology,计划使用storm swap命令五、监控topology使用Storm UI或者集群上的工作日志