千家信息网

Storm并发度怎么设置

发表于:2025-01-16 作者:千家信息网编辑
千家信息网最后更新 2025年01月16日,本篇内容介绍了"Storm并发度怎么设置"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Storm架构
千家信息网最后更新 2025年01月16日Storm并发度怎么设置

本篇内容介绍了"Storm并发度怎么设置"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

  Storm架构:master/slave

  主节点:Nimbus

  负责在集群上进行任务(Topology)的分发与资源的调度以及监控

  工作节点:Supervisor

  接收到任务请求后,启动一个或多个Worker进程来处理任务;默认情况下,一个Supervisor最多启动4个Worker

  工作进程:Worker

  在Supervisor中的子进程,存在着若干个Spout和Bolt线程,来负责Spout和Bolt组件处理任务(实际是开启的executor线程)

  作业:Topologies(死循环,不会结束)

  Spout:获取数据的组件

  Bolt:处理数据的组件

  Stream:Spout和Bolt之间数据流动的通道

  Tuple:

  1)Stream的最小组成单位,Spout向Bolt发送一次数据叫一个Tuple

  2)同一个Stream中Tuple的类型相同,不同的Stream中可能相同/不同

  3)一个key-value形式的Map

  数据流分发策略(Stream groupings):

  解决Spout和Bolt之间数据传输(发送Tuple元组)的问题

  1)shuffleGrouping:

  随机派发Stream中的Tuple到Bolt中

  2)fieldsGrouping:

  根据字段的哈希值与Bolt个数进行取模操作然后进行分组发送,一个节点是一个Worker, 一个Bolt是一个task, 全部节点的Spout或Bolt的个数叫并发度。

  Storm并发度设置:

  1.Worker并发度:

  首先按照集群规模和集群的物理位置来设定

  一般会把Worker均分到每一个节点里, 一个supervisor默认设置一个Worker

  2.Spout数量设定:

  Spout总数默认等于Kafka(消息中间件)对应Topic的分区数,提高吞吐速度

  一般一个Worker设置一个Spout

  3.Bolt1数量设定:

  首先根据数据量和处理数据的时间来设定

  一般情况下, Bolt1的数量是Spout数量的2倍(根据项目进行修改)

  4.Bolt2数量设定:

  首先根据数据量和处理数据的时间来设定,因为Bolt1传过来的中间结果数据已经减少很多,Bolt2的数量可以酌情减少。

  容错机制:异或方式<相同为,不同为1>

  tupleId - 产生新数据,会产生一个tupleId;

  整个过程中的tupleId按顺序两两异或到最后

  若结果为,则数据正确,否则错误

  messageId - 代表整条信息,API中指定提供给程序员,long型

  rootId - 代表某条信息,提供给storm框架

  出现数据运算失败的两种情况:

  execute(){

  1.异常(数据异常)

  2.任务运行超时 -- 认为处理失败

  }

  因为数据发送时导致的数据重复发送问题, 如何解决?

  Ⅰ.

  1.比如对订单信息做处理, 处理成功后, 把订单信息ID存储到Redis(set)

  2.信息发送时, 判断是否处理过此信息

  execute(){

  if()

  else()

  }

  Ⅱ.

  不作处理: 点击流日日志分析: pv, uv

  指标分析: 订单人数, 订单金额

  消息的可靠性保障和acker机制: open / nextTuple / ack / fail/ close

  Ⅰ.Spout类:

  在发送tuple时,Spout会提供一个msgId,用于在后续识别tuple;Storm会根据msgId跟踪创建的tuple树,直到某个tuple被完整处理,根据msgId调用最初发送tuple的Spout中ack()方法,检测到超时就调用fail()方法 -- 这两个方法的调用必须由最初创建这个tuple的Spout执行;当Spout从消息队列(Kafka/RocketMQ)中取出一条数据时,实际上没有被取出,而是保持一个挂起状态,等待消息完成的信号,挂起状态的信息不会被发送到其它的消费者;当该消息被"取出"时,队列会将消息体数据和一个唯一的msgId提供给客户端,当Spout的ack()/fail()方法被调用时,Spout根据发送的id向队列请求将消息从队列中移除/重新放入队列。

  Ⅱ.acker任务:

  高效的实现可靠性 -- 必须显式的在Bolt中调用定义在Spout中的ack()和fail()方法,Storm拓扑有一些特殊的称为"acker"的任务,负责跟踪Spout发送的tuple的DAG,当一个acker发现DAG结束后,它就会给创建Spout tuple的Spout任务发送一条消息,让这个任务来应答这个消息。acker并不会直接的跟踪tuple树,在acker树中存储了一个表,用于将Spout tuple的id与一对值相映射,id为创建这个tuple的任务id,第二个值为一个64bit的数字(ack val),这个值是这棵树中所有被创建的或者被应答的tuple的tuple id进行异或运算的结果值。

  Ⅲ.移除可靠性:

  1.将 Config.TOPOLOGY_ACKERS 设置为

  2.在SpoutOutputCollector.emit 方法中省略消息 id 来关闭 spout tuple 的跟踪功能

  3.在发送 tuple 的时候选择发送"非锚定"的(unanchored)tuple

"Storm并发度怎么设置"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0