千家信息网

Splunk中怎么利用restapi获取数据到第三方系统

发表于:2024-11-24 作者:千家信息网编辑
千家信息网最后更新 2024年11月24日,Splunk中怎么利用restapi获取数据到第三方系统,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。测试The sam
千家信息网最后更新 2024年11月24日Splunk中怎么利用restapi获取数据到第三方系统

Splunk中怎么利用restapi获取数据到第三方系统,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

测试

The same python implementation for curl function

'''

sid=`curl -u admin:changeme -k https://localhost:8089/services/search/jobs -d search="search source=\"http:hec_test\" refresh | head 21" 2>/dev/null | sed "1,2d" | sed "2d" | sed "s/.*>\([0-9]*\.[0-9]*\)<.*/\1/"`

echo $sid

curl -u admin:changeme -k https://localhost:8089/services/search/jobs/$sid?output_mode=json

curl -u admin:changeme -k https://localhost:8089/services/search/jobs/$sid/results/ --get -d output_mode=json 2>/dev/null >out.json

'''

基于Python3的封装

# coding=utf-8

import urllib

import httplib2

from xml.dom import minidom

import time

import json

import traceback

class SplunkQuery(object):

def __init__(self):

self.baseurl = 'https://IP:8089'

self.userName = 'xxx'

self.password = 'xxx'

self.sessionKey = self.get_key()

def get_key(self):

session_key = ""

try:

server_content = httplib2.Http(disable_ssl_certificate_validation=True).request(

self.baseurl + '/services/auth/login', 'POST', headers={}, body=urllib.parse.urlencode({'username': self.userName, 'password': self.password}))[1]

session_key = minidom.parseString(server_content).getElementsByTagName('sessionKey')[

0].childNodes[0].nodeValue

except:

# traceback.print_exc()

pass

return session_key

def submit_job(self, search_query, earliest_time=None, latest_time=None):

# check if the query has the search operator

if not search_query.startswith('search'):

search_query = 'search ' + search_query

data = {'search': search_query}

if earliest_time:

data['earliest_time'] = earliest_time

if latest_time:

data['latest_time'] = latest_time

sid_body = httplib2.Http(disable_ssl_certificate_validation=True) \

.request(self.baseurl + '/services/search/jobs',

'POST',

headers={

'Authorization': 'Splunk %s' % self.sessionKey},

body=urllib.parse.urlencode(data))[1]

sid = minidom.parseString(sid_body).getElementsByTagName("sid")[0].childNodes[0].nodeValue

print("sid:" + sid)

return sid

def request_results(self, sid):

start = time.time()

response = httplib2.Http(disable_ssl_certificate_validation=True) \

.request(self.baseurl + '/services/search/jobs/' + sid +

"?output_mode=json", 'POST',

headers={

'Authorization': 'Splunk %s' % self.sessionKey},

body=urllib.parse.urlencode({}))[1]

data = json.loads(response)

while not data["entry"][0]["content"]["isDone"]:

time.sleep(0.1)

response = httplib2.Http(disable_ssl_certificate_validation=True) \

.request(self.baseurl + '/services/search/jobs/' + sid +

"?output_mode=json",

'POST',

headers={

'Authorization': 'Splunk %s' % self.sessionKey},

body=urllib.parse.urlencode({}))[1]

data = json.loads(response)

request_time = time.time() - start

print("result event count:", data["entry"][0]["content"]["eventCount"], "request time:", request_time)

result_response = httplib2.Http(disable_ssl_certificate_validation=True) \

.request(self.baseurl + '/services/search/jobs/' + sid + "/results?count=0",

'GET',

headers={

'Authorization': 'Splunk %s' % self.sessionKey},

body=urllib.parse.urlencode({"output_mode": "json"}))[1]

results = json.loads(result_response)["results"]

print(len(results))

# assert data["entry"][0]["content"]["eventCount"] == len(results)

end = time.time()

print("result count:", len(results), "result request time:", end - start)

# response = httplib2.Http(disable_ssl_certificate_validation=True) \

# .request(self.baseurl + '/services/search/jobs/' + sid +

# "?output_mode=json", 'DELETE',

# headers={

# 'Authorization': 'Splunk %s' % self.sessionKey},

# body=urllib.parse.urlencode({}))[1]

return results

def run(self, searchQuery, earliest_time=None, latest_time=None):

start = time.time()

sid = self.submit_job(searchQuery, earliest_time)

result = self.request_results(sid)

end = time.time()

print("search time:", end - start)

return result

调用

print(">>>>>>>>>>>>>>>>SplunkQuery>>>>>>>>>>>>>>>>>>>>>")

Q = SplunkQuery()

result = Q.run(searchQuery='''index=xx sourcetype=xx''')

print(result[0])

result = Q.run(searchQuery='''index=xx sourcetype=xx''', earliest_time="2020-06-19T12:00:00")

print(result[5])

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

帮助 数据 第三方 系统 清楚 内容 对此 文章 新手 更多 知识 行业 资讯 资讯频道 难题 需求 频道 进一 学习 封装 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 我的世界科技服务器怎么玩 非关系型数据库是开源的 医学网络技术就业前景 英雄联盟登录服务器叫什么 gp数据库的时间类型 广受好评的mysql数据库运维 epic官网服务器 学校网络安全专项应急预案 搭建应用服务器 基于流量分析网络安全 什么是数据库的对象 明日之后伊尼山服务器下载 oracle数据库的好处 计算机软件开发有几级考试 广东人来网络技术有限公司 如何将数据库的文件打印出来 服务器可以使用长波模块吗 系统软件开发招聘 软件开发方向及其发展趋势 sql数据库学了有用吗 重庆办公系统软件开发哪家正规 怀旧服服务器怎么关闭自动修复 网络技术基础第四章自测 网络安全某一方面的课题 最容易上手的软件开发 如何设计网站的数据库 旧手机怎么加盟小米云服务器 lol获取服务器信息失败 丽江市公安网络安全 什么软件能够打开数据库
0