OSSIM传感器Agent传送机制初探
OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式有序发给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行归一化处理,本文主要就如何有序发送数据与如何完成归一化进行讨论。
OSSIM传感器在通过GET框架实现OSSIM代理和OSSIM服务器之间通信协议和数据格式的之间转换。下面我们先简要看一下ossim-agent脚本:
#!/usr/bin/python -OOtimport syssys.path.append('/usr/share/ossim-agent/')sys.path.append('/usr/local/share/ossim-agent/')from ossim_agent.Agent import Agentagent = Agent()agent.main()
这里需要GET作为OSSIM代理向OSSIM服务器输送数据。实现紧密整合所需的两个主要操作是"生成"(或)OSSIM兼容事件的"映射Mapping")和此类数据向OSSIM的"传输"服务器。它负责此类操作的GET框架的两个组件是EventHandler和Sender Agent,如图1所示。
图1 将Get框架内容集成到OSSIM
Event Handler的主要任务是映射数据源插件采集的事件到SIEM实例警报的OSSIM标准化事件格式。为了执行这样的过程,原始消息经历由RAW LOG转换为现有归一化数据字段格式的一个转变;在上图中我们将这些机制表示为"归一化Normalization"和"OSSIM消息"。部分日志归一化代码:
from Logger import Loggerfrom time import mktime, strptimelogger = Logger.loggerclass Event: EVENT_TYPE = 'event' EVENT_ATTRS = [ "type", "date", "sensor", "interface", "plugin_id", "plugin_sid", "priority", "protocol", "src_ip", "src_port", "dst_ip", "dst_port", "username", "password", "filename", "userdata1", "userdata2", "userdata3", "userdata4", "userdata5", "userdata6", "userdata7", "userdata8", "userdata9", "occurrences", "log", "data", "snort_sid", # snort specific "snort_cid", # snort specific "fdate", "tzone" ] def __init__(self): self.event = {} self.event["event_type"] = self.EVENT_TYPE def __setitem__(self, key, value): if key in self.EVENT_ATTRS: self.event[key] = self.sanitize_value(value) if key == "date": # 以秒为单位 self.event["fdate"]=self.event[key] try: self.event["date"]=int(mktime(strptime(self.event[key],"%Y-%m-%d %H:%M:%S"))) except: logger.warning("There was an error parsing date (%s)" %\ (self.event[key])) elif key != 'event_type': logger.warning("Bad event attribute: %s" % (key)) def __getitem__(self, key): return self.event.get(key, None) # 事件表示 def __repr__(self): event = self.EVENT_TYPE for attr in self.EVENT_ATTRS: if self[attr]: event += ' %s="%s"' % (attr, self[attr]) return event + "\n" # 返回内部哈希值 def dict(self): return self.event def sanitize_value(self, string): return str(string).strip().replace("\"", "\\\"").replace("'", "")class EventOS(Event): EVENT_TYPE = 'host-os-event' EVENT_ATTRS = [ "host", "os", "sensor", "interface", "date", "plugin_id", "plugin_sid", "occurrences", "log", "fdate", ]class EventMac(Event): EVENT_TYPE = 'host-mac-event' EVENT_ATTRS = [ "host", "mac", "vendor", "sensor", "interface", "date", "plugin_id", "plugin_sid", "occurrences", "log", "fdate", ]class EventService(Event): EVENT_TYPE = 'host-service-event' EVENT_ATTRS = [ "host", "sensor", "interface", "port", "protocol", "service", "application", "date", "plugin_id", "plugin_sid", "occurrences", "log", "fdate", ]class EventHids(Event): EVENT_TYPE = 'host-ids-event' EVENT_ATTRS = [ "host", "hostname", "hids_event_type", "target", "what", "extra_data", "sensor", "date", "plugin_id", "plugin_sid", "username", "password", "filename", "userdata1", "userdata2", "userdata3", "userdata4", "userdata5", "userdata6", "userdata7", "userdata8", "userdata9", "occurrences", "log", "fdate", ]class WatchRule(Event): EVENT_TYPE = 'event' EVENT_ATTRS = [ "type", "date", "fdate", "sensor", "interface", "src_ip", "dst_ip", "protocol", "plugin_id", "plugin_sid", "condition", "value", "port_from", "src_port", "port_to", "dst_port", "interval", "from", "to", "absolute", "log", "userdata1", "userdata2", "userdata3", "userdata4", "userdata5", "userdata6", "userdata7", "userdata8", "userdata9", "filename", "username", ]class Snort(Event): EVENT_TYPE = 'snort-event' EVENT_ATTRS = [ "sensor", "interface", "gzipdata", "unziplen", "event_type", "plugin_id", "type", "occurrences" ]
日志编码代码:
import threading, timefrom Logger import Loggerlogger = Logger.loggerfrom Output import Outputimport Configimport Eventfrom Threshold import EventConsolidationfrom Stats import Statsfrom ConnPro import ServerConnProclass Detector(threading.Thread): def __init__(self, conf, plugin, conn): self._conf = conf self._plugin = plugin self.os_hash = {} self.conn = conn self.consolidation = EventConsolidation(self._conf) logger.info("Starting detector %s (%s).." % \ (self._plugin.get("config", "name"), self._plugin.get("config", "plugin_id"))) threading.Thread.__init__(self) def _event_os_cached(self, event): if isinstance(event, Event.EventOS): import string current_os = string.join(string.split(event["os"]), ' ') previous_os = self.os_hash.get(event["host"], '') if current_os == previous_os: return True else: # 失败并添加到缓存 self.os_hash[event["host"]] = \ string.join(string.split(event["os"]), ' ') return False def _exclude_event(self, event): if self._plugin.has_option("config", "exclude_sids"): exclude_sids = self._plugin.get("config", "exclude_sids") if event["plugin_sid"] in Config.split_sids(exclude_sids): logger.debug("Excluding event with " +\ "plugin_id=%s and plugin_sid=%s" %\ (event["plugin_id"], event["plugin_sid"])) return True return False def _thresholding(self): self.consolidation.process() def _plugin_defaults(self, event): # 从配置文件中获取默认参数 if self._conf.has_section("plugin-defaults"): # 1) 日期 default_date_format = self._conf.get("plugin-defaults", "date_format") if event["date"] is None and default_date_format and \ 'date' in event.EVENT_ATTRS: event["date"] = time.strftime(default_date_format, time.localtime(time.time())) # 2) 传感器 default_sensor = self._conf.get("plugin-defaults", "sensor") if event["sensor"] is None and default_sensor and \ 'sensor' in event.EVENT_ATTRS: event["sensor"] = default_sensor # 3) 网络接口 default_iface = self._conf.get("plugin-defaults", "interface") if event["interface"] is None and default_iface and \ 'interface' in event.EVENT_ATTRS: event["interface"] = default_iface # 4) 源IP if event["src_ip"] is None and 'src_ip' in event.EVENT_ATTRS: event["src_ip"] = event["sensor"] # 5) 时区 default_tzone = self._conf.get("plugin-defaults", "tzone") if event["tzone"] is None and 'tzone' in event.EVENT_ATTRS: event["tzone"] = default_tzone # 6) sensor,source ip and dest != localhost if event["sensor"] in ('127.0.0.1', '127.0.1.1'): event["sensor"] = default_sensor if event["dst_ip"] in ('127.0.0.1', '127.0.1.1'): event["dst_ip"] = default_sensor if event["src_ip"] in ('127.0.0.1', '127.0.1.1'): event["src_ip"] = default_sensor # 检测日志的类型 if event["type"] is None and 'type' in event.EVENT_ATTRS: event["type"] = 'detector' return event def send_message(self, event): if self._event_os_cached(event): return if self._exclude_event(event): return #对于一些空属性使用默认值。 event = self._plugin_defaults(event) # 合并之前检查 if self.conn is not None: try: self.conn.send(str(event)) except: id = self._plugin.get("config", "plugin_id") c = ServerConnPro(self._conf, id) self.conn = c.connect(0, 10) try: self.conn.send(str(event)) except: return logger.info(str(event).rstrip()) elif not self.consolidation.insert(event): Output.event(event) Stats.new_event(event) def stop(self): #self.consolidation.clear() pass#在子类中重写 def process(self): pass def run(self): self.process()class ParserSocket(Detector): def process(self): self.process()class ParserDatabase(Detector): def process(self): self.process()
… …
从上可以看出,传感器的归一化主要负责对每个LOG内数据字段进行重新编码,使其生成一个全新的可能用于发送到OSSIM服务器的完整事件。为达成这种目的GET框架中包含了一些特定的功能,以便将所有的功能转换需要BASE64转换的字段。"OSSIM消息"负责填充GET生成的原始事件中不存在的字段。所以上面讲的plugin_id、plugin_sid是用来表示日志消息来源类型和子类型,这也是生成SIEM事件的必填字段。为事件格式完整性考虑,有些时候在无法确认源或目标IP时,系统默认会采用0.0.0.0来填充该字段。
注意:这种必填字段我们可利用phpmyadmin工具查看OSSIM的MySQL数据库。
Sender Agent负责完成下面两个任务:
发送由GET收集并由事件格式化的事件发送到OSSIM服务器,这项任务由Event Hander创建的消息组成消息队列发送到消息中间件实现,时序图如图2所示。
图2 序列图: 从安全探测器的日志转换到OSSIM服务器事件
2)管理GET框架和OSSIM服务器之间的通信,通信端口为TCP 40001通过双向握手实现。归一化原始日志是规范化过程的一个重要环节,OSSIM在归一化处理日志的同时保留了原始日志,可用于日志归档,提供了一种从规范化事件中提取原始日志的手段。
经过归一化处理的EVENTS,存储到MySQL数据库中,如图3所示。接着就由关联引擎根据规则、优先级、可靠性等参数进行交叉关联分析,得出风险值并发出各种报警提示信息。
图3 OSSIM平台日志存储机制
接下来我们再看个实例,下面是一段Apache、CiscoASA以及SSH的原始日志,如图4、图5、图6所示。
- Apache插件中的正则表达式:
[0001 - apache-access] 访问日志event_type=eventregexp=((?P\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(:(?P\d{1,5}))? )?(?P\S+) (?P\S+) (?P\S+) \[(?P\d{2}\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2})\s+[+-]\d{4}\] \"(?P[^\"]*)\" (?P\d{3}) ((?P\d+)|-)( \"(?P[^\"]*)\" \"(?P[^\"]*)\")?$src_ip={resolv($src)}dst_ip={resolv($dst)}dst_port={$port}date={normalize_date($date)}plugin_sid={$code}username={$user}userdata1={$request}userdata2={$size}userdata3={$referer_uri}userdata4={$useragent}filename={$id}
[0002 - apache-error] 错误日志
event_type=eventregexp=\[(?P\w{3} \w{3} \d{2} \d{2}:\d{2}:\d{2} \d{4})\] \[(?P(emerg|alert|crit|error|warn|notice|info|debug))\] (\[client (?P\S+)\] )?(?P.*)date={normalize_date($date)}plugin_sid={translate($type)}src_ip={resolv($src)}userdata1={$data}
图4 Apache原始日志
图5 一条Cisco ASA 原始日志
图6 Cisco ASA 事件分类
通过过OSSIM归一化处理后的实际再通过Web前端展现给大家方便阅读的格式。归一化处理后的事件和原始日志的对比方法我们在《开源安全运维平台OSSIM疑难解析:入门篇》一书中还会讲解。而在图7所示的例子当中,仅使用了Userdata1和Userdata2,并没有用到Userdata3~Userdata9这些是扩展位,主要是为了预留给其他设备或服务使用,这里目标地址会标记成IP地址的形式,例如:Host192.168.11.160。实际上归一化处理这种操作发生在系统采集和存储事件之后,关联和数据分析之前,在SIEM工具中把采集过程中把数据转换成易读懂的格式,采用格式化的数据,能更容易理解。