python爬取微博图片数据存到Mysql中遇到的各种坑\python Mysql存储图片
本人长期出售超大量微博数据、旅游网站评论数据,并提供各种指定数据爬取服务,Message to YuboonaZhang@Yahoo.com。同时欢迎加入社交媒体数据交流群:99918768
前言
由于硬件等各种原因需要把大概170多万2t左右的微博图片数据存到Mysql中.之前存微博数据一直用的非关系型数据库mongodb,由于对Mysql的各种不熟悉,踩了无数坑,来来回回改了3天才完成。
挖坑填坑之旅
建表
存数据的时候首先需要设计数据库,我准备设计了3个表
微博表:[id, userid, blog_text, lat, lng, created_time, reserve] pkey: id
图片表:[md5, pic_url, pic_bin, exif, reserve] pkey: md5
关系表:[id, md5, reserve] pkey: (id, md5) fkey: (id, 微博表id) (md5, 图片表md5)
建表的时候别的问题都还好,主要是 pic_bin 的类型和 blog_text 的类型有很大的问题,首先是pic_bin的类型,开始设置的为BLOB,但是运行之后发现BLOB最大只能存1M的数据,并不能满足微博图片的存储,后改成MEDIUMBLOB(16M)基本能够满足要求了。再后来就是blog_text,我遇到的第一个大坑
开始的时候很自然的设置blog_text的类型为TEXT,但跑起来发现有些数据存不进去,会报错,经筛查发现是有些微博文本中包含了emoji表情...随后找了很多资料发现是因为utf8下文字是三字节,但是emoji是四字节,需要将编码改成utf8mb4。然而我在mac上整mysql的配置文件报各种奇葩错误,一怒之下把TEXT改成了BLOB,就好了。因为本地是MAC,我要连接到远程的一台Windows上才能通过那个Windows连接到群晖的Mysql上...本地配置改了也白改。
存图片
然后这就是一个大坑!!! 由于我使用的python3,所以读取图片得到的二进制的结果前面会有一个b', 表示bytes,正是由于这个b'导致sql语句拼接的时候这个b后面的单引号会和sql语句的引号结合,导致后面的二进制没有在引号里面出错!二进制编码又不像string可以对字符转义,试了好多方法都不行!最后没有办法使用base64 对二进制进行加密转化成字符串,存到数据库中,然后要用时的时候再解密。
pic_bin = str(base64.b64encode(pic_bin))[2:-1]
改配置文件
由于使用Python多进程,一个小时8G数据量,图片数据比较大,发包的时候回超过mysql的默认限制,出现Mysql server has gone away, 这个时候要改配置文件,在配置文件中参数
max_allowed_packet = 600M
wait_timeout = 60000
Lost connection to Mysql server during query
程序跑着跑着总会出现这个错误,一直找原因,试了各种办法看了好多资料,一直都是错误。实在不知道什么原因了...后来一想,我管他什么原因,失去连接之后重新连接就行了。使用conn.Ping(True) 判断是否连接mysql成功。如果失去连接就重新连接就行了!最后解决了这个问题
代码实现
#!/usr/bin/env python# -*- coding: utf-8 -*-# Created by Baoyi on 2017/10/16from multiprocessing.pool import Poolimport pymysqlimport requestsimport jsonimport exifreadfrom io import BytesIOimport configparserimport hashlibimport loggingimport base64# 配置logginglogging.basicConfig(level=logging.WARNING, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename='weibo.log', filemode='w')cf = configparser.ConfigParser()cf.read("ConfigParser.conf")# 读取配置mysqldb_host = cf.get("mysql", "db_host")db_port = cf.getint("mysql", "db_port")db_user = cf.get("mysql", "db_user")db_pass = cf.get("mysql", "db_pass")db = cf.get("mysql", "db")# 创建连接conn = pymysql.connect(host=db_host, user=db_user, passwd=db_pass, db=db, port=db_port, charset='utf8')# 获取游标cursor = conn.cursor()# 创建insert_sqlinsert_blog_sql = ( "INSERT IGNORE INTO blog(userid, id, blog_text, lat, lng, created_time) VALUES('{uid}', '{id}','{blog_text}','{lat}','{lng}','{created_time}')")insert_pic_sql = ( "INSERT IGNORE INTO pics(pic_url, pic_bin, md5, exif) VALUES ('{pic_url}','{pic_bin}','{md5}','{exif}')")insert_relationship_sql = ( "INSERT IGNORE INTO relationship(id, md5) VALUES ('{id}','{md5}')")uid = []with open('./data/final_id.txt', 'r') as f: for i in f.readlines(): uid.append(i.strip('\r\n'))# 处理图片数据def handle_pic(pic_url): large_pic_url = pic_url.replace('thumbnail', 'large') large_bin = requests.get(large_pic_url) return large_bin.contentdef get_poiid_info(uid): try: url = 'https://api.weibo.com/2/statuses/user_timeline.json' load = { 'access_token': 'xxxxxxxxxx', 'uid': uid, 'count': 100, 'feature': 2, 'trim_user': 1 } get_info = requests.get(url=url, params=load, timeout=(10, 10)) if get_info.status_code != 200: logging.warning(ConnectionError) pass info_json = json.loads(get_info.content) info_json['uid'] = uid statuses = info_json['statuses'] # 处理筛选微博数据 for status in statuses: id = status['idstr'] if status['geo'] is not None: lat = status['geo']['coordinates'][0] lng = status['geo']['coordinates'][1] pic_urls = status['pic_urls'] # 判断是否在北京 if (115.7 < lng < 117.4) and (39.4 < lat < 41.6): # 若在北京,插入blog数据进库 blog_text = status['text'].replace('\'', '\'\'') created_time = status['created_at'] try: cursor.execute( insert_blog_sql.format(uid=uid, id=id, blog_text=blog_text, lat=lat, lng=lng, created_time=created_time)) except pymysql.err.OperationalError as e_blog: logging.warning(e_blog.args[1]) pass # conn.commit() # 处理图片 for pic_url in pic_urls: # 获取原图片二进制数据 pic_bin = handle_pic(pic_url['thumbnail_pic']) # 读取exif 数据 pic_file = BytesIO(pic_bin) # 将二进制数据转化成文件对象便于读取exif数据信息和生成MD5 tag1 = exifread.process_file(pic_file, details=False, strict=True) tag = {} for key, value in tag1.items(): if key not in ( 'JPEGThumbnail', 'TIFFThumbnail', 'Filename', 'EXIF MakerNote'): # 去除四个不必要的exif属性,简化信息量 tag[key] = str(value) tags = json.dumps(tag) # dumps为json类型 此tag即为exif的json数据 # 生成MD5 MD5 = hashlib.md5(pic_file.read()).hexdigest() # 首先把二进制图片用base64 转成字符串之后再存 try: cursor.execute( insert_pic_sql.format(pic_url=pic_url['thumbnail_pic'].replace('thumbnail', 'large'), pic_bin=str(base64.b64encode(pic_bin))[2:-1], md5=MD5, exif=tags)) except pymysql.err.OperationalError as e_pic: logging.warning(e_pic.args[1]) pass try: cursor.execute(insert_relationship_sql.format(id=id, md5=MD5)) except pymysql.err.OperationalError as e_relation: logging.warning(e_relation) pass conn.commit() else: logging.info(id + " is Not in Beijing") pass else: logging.info(id + ' Geo is null') pass except pymysql.err.OperationalError as e: logging.error(e.args[1]) passdef judge_conn(i): global conn try: conn.ping(True) get_poiid_info(i) except pymysql.err.OperationalError as e: logging.error('Reconnect') conn = pymysql.connect(host=db_host, user=db_user, passwd=db_pass, db=db, charset='utf8') get_poiid_info(i)def handle_tuple(a_tuple): read_uid_set = [] for i in a_tuple: read_uid_set.append(i[0]) return set(read_uid_set)if __name__ == '__main__': sql_find_uid = ( "SELECT userid FROM blog" ) cursor.execute(sql_find_uid) read_uid_tuple = cursor.fetchall() read_list = handle_tuple(read_uid_tuple) print(len(read_list)) new_uid = set(uid).difference(read_list) print(len(new_uid)) pool = Pool() pool.map(judge_conn, list(new_uid))
个人博客
8aoy1.cn