go语言怎么实现Elasticsearches批量修改查询及发送MQ
发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,这篇"go语言怎么实现Elasticsearches批量修改查询及发送MQ"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章
千家信息网最后更新 2025年01月20日go语言怎么实现Elasticsearches批量修改查询及发送MQ
这篇"go语言怎么实现Elasticsearches批量修改查询及发送MQ"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"go语言怎么实现Elasticsearches批量修改查询及发送MQ"文章吧。
update_by_query批量修改
POST post-v1_1-2021.02,post-v1_1-2021.03,post-v1_1-2021.04/_update_by_query{ "query": { "bool": { "must": [ { "term": { "join_field": { "value": "post" } } }, { "term": { "platform": { "value": "toutiao" } } }, { "exists": { "field": "liked_count" } } ] } }, "script":{ "source":"ctx._source.liked_count=0", "lang":"painless" }}
索引添加字段
PUT user_tiktok/_doc/_mapping?include_type_name=true{ "post_signature":{ "StuClass":{ "type":"keyword" }, "post_token":{ "type":"keyword" } }}PUT user_toutiao/_mapping{ "properties": { "user_token": { "type": "text" } }}
查询es发送MQ
from celery import Celeryfrom elasticsearch import Elasticsearchimport loggingimport arrowimport pytzfrom elasticsearch.helpers import scan, streaming_bulkimport redispool_16_8 = redis.ConnectionPool(host='10.0.3.100', port=6379, db=8, password='EfcHGSzKqg6cfzWq')rds_16_8 = redis.StrictRedis(connection_pool=pool_16_8)logger = logging.getLogger('elasticsearch')logger.disabled = Falselogger.setLevel(logging.INFO)es_zoo_connection = Elasticsearch('http://eswriter:e s密码@e sip:4000', dead_timeout=10, retry_on_timeout=True)logger = logging.getLogger(__name__)class ES(object): index = None doc_type = None id_field = '_id' version = '' source_id_field = '' aliase_field = '' separator = '-' aliase_func = None es = None tz = pytz.timezone('Asia/Shanghai') logger = logger @classmethod def mget(cls, ids=None, index=None, **kwargs): index = index or cls.index docs = cls.es.mget(body={'ids': ids}, doc_type=cls.doc_type, index=index, **kwargs) return docs @classmethod def count(cls, query=None, index=None, **kwargs): index = index or cls.index c = cls.es.count(doc_type=cls.doc_type, body=query, index=index, **kwargs) return c.get('count', 0) @classmethod def upsert(cls, doc, doc_id=None, index=None, doc_as_upsert=True, **kwargs): body = { "doc": doc, } if doc_as_upsert: body['doc_as_upsert'] = True id = doc_id or cls.id_name(doc) index = index or cls.index_name(doc) cls.es.update(index, id, cls.doc_type, body, **kwargs) @classmethod def search(cls, index=None, query=None, **kwargs): index = index or cls.index return cls.es.search(index=index, body=query, **kwargs) @classmethod def scan(cls, query, index=None, **kwargs): return scan(cls.es, query=query, index=index or cls.index, **kwargs) @classmethod def index_name(cls, doc): if cls.aliase_field and cls.aliase_field in doc.keys(): aliase_part = doc[cls.aliase_field] if isinstance(aliase_part, str): aliase_part = arrow.get(aliase_part) if isinstance(aliase_part, int): aliase_part = arrow.get(aliase_part).astimezone(cls.tz) if cls.version: index = '{}{}{}{}{}'.format(cls.index, cls.separator, cls.version, cls.separator, cls.aliase_func(aliase_part)) else: index = '{}{}{}'.format(cls.index, cls.separator, cls.aliase_func(aliase_part)) else: index = cls.index return index @classmethod def id_name(cls, doc): id = doc.get(cls.id_field) and doc.pop(cls.id_field) or doc.get(cls.source_id_field) if not id: print('========', doc) assert id, 'doc _id must not be None' return id @classmethod def bulk_upsert(cls, docs, **kwargs): """ 批量操作文章, 仅支持 index 和 update """ op_type = kwargs.get('op_type') or 'update' chunk_size = kwargs.get('chunk_size') if op_type == 'update': upsert = kwargs.get('upsert', True) if upsert is None: upsert = True else: upsert = False actions = cls._gen_bulk_actions(docs, cls.index_name, cls.doc_type, cls.id_name, op_type, upsert=upsert) result = streaming_bulk(cls.es, actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False, max_retries=5, request_timeout=25) return result @classmethod def _gen_bulk_actions(cls, docs, index_name, doc_type, id_name, op_type, upsert=True, **kwargs): assert not upsert or (upsert and op_type == 'update'), 'upsert should use "update" as op_type' for doc in docs: # 支持 index_name 作为一个工厂函数 if callable(index_name): index = index_name(doc) else: index = index_name if op_type == 'index': _source = doc elif op_type == 'update' and not upsert: _source = {'doc': doc} elif op_type == 'update' and upsert: _source = {'doc': doc, 'doc_as_upsert': True} else: continue if callable(id_name): id = id_name(doc) else: id = id_name # 生成 Bulk 动作 action = { "_op_type": op_type, "_index": index, "_type": doc_type, "_id": id, "_source": _source } yield actionclass tiktokEsUser(ES): index = 'user_tiktok' doc_type = '_doc' id_field = '_id' source_id_field = 'user_id' es = es_zoo_connectionfrom kombu import Exchange, Queue, bindingdef data_es_route_task_spider(name, args, kwargs, options, task=None, **kw): return { 'exchange': 'tiktok', 'exchange_type': 'topic', 'routing_key': name }class DataEsConfig_download(object): broker_url = 'amqp://用户:密码@ip:端口/' task_ignore_result = True task_serializer = 'json' accept_content = ['json'] task_default_queue = 'default' task_default_exchange = 'default' task_default_routing_key = 'default' exchange = Exchange('tiktok', type='topic') task_queues = [ Queue( 'tiktok.user_avatar.download', [binding(exchange, routing_key='tiktok.user_avatar.download')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post_avatar.download', [binding(exchange, routing_key='tiktok.post_avatar.download')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post.spider', [binding(exchange, routing_key='tiktok.post.spider')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post.save', [binding(exchange, routing_key='tiktok.post.save')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.user.save', [binding(exchange, routing_key='tiktok.user.save')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post_avatar.invalid', [binding(exchange, routing_key='tiktok.post_avatar.invalid')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.user_avatar.invalid', [binding(exchange, routing_key='tiktok.user_avatar.invalid')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.comment.save', [binding(exchange, routing_key='tiktok.comment.save')], queue_arguments={'x-queue-mode': 'lazy'} ), ] task_routes = (data_es_route_task_spider,) enable_utc = True timezone = "Asia/Shanghai"# 下载apptiktok_app = Celery( 'tiktok', include=[ 'task.tasks', ])tiktok_app.config_from_object(DataEsConfig_download)# 发任务生产者,更新舆情user历史信息def send_post(): query = { "query": { "bool": { "must": [ { "exists": { "field": "post_signature" } }, { "range": { "following_num": { "gte": 1000 } } } ] } }, "_source": ["region", "sec_uid", "post_signature"] } # query = { # "query": { # "bool": { # "must": [ # {"exists": { # "field": "post_signature" # }}, # { # "match": { # "region": "MY" # } # } # ] # } # }, # "_source": ["region", "sec_uid", "post_signature"] # } r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100) for item in map(lambda x: x['_source'], r): tiktok_app.send_task('tiktok.post.spider', args=(item,))def send_sign_token(): query = { "query": { "bool": { "must": [ { "exists": { "field": "post_signature" } }, { "range": { "following_num": { "gte": 1000 } } }, { "range": { "create_time": { "gte": "2021-01-06T00:00:00", "lte": "2021-01-06T01:00:00" } } } ] } }, "_source": ["user_id", "sec_uid"] } r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100) for item in map(lambda x: x['_source'], r): tiktok_app.send_task('tiktok.user.sign_token', args=(item,))if __name__ == '__main__': send_post() # send_sign_token()
以上就是关于"go语言怎么实现Elasticsearches批量修改查询及发送MQ"这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注行业资讯频道。
内容
查询
语言
文章
密码
知识
篇文章
支持
价值
任务
信息
函数
动作
历史
大部分
字段
就是
工厂
更多
步骤
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
饶河软件开发专业
inca数据库default
原装软件开发服务商
网络安全演讲稿末尾
无线网怎么开启服务器
城固一中开展网络安全宣传
计算机应用软件开发设计
dota 2连接服务器延缓
南网总调网络安全处
黑龙江网络技术服务供应商家
织梦cms数据库
易网科技软件开发创始人
数据库删一个信息吗
国内服务器运维怎么样
网络安全班会主题内容大全
便利蜂软件开发
怀旧服部落服务器有什么用
我的世界服务器怎么改QQ号
网络安全问题预防
上海神秘服务器
无法被服务器理解的http状态
城固一中开展网络安全宣传
数据库原理课后作业
网络安全大赛的直播
遨游网络安全手抄报
搭建好的代码怎么连接服务器
天津华为服务器虚拟化迁移云主机
爱康科技能源互联网战略
网络安全培训技术
软件开发 预算表