python跨库检查数据一致性的示例分析
python跨库检查数据一致性的示例分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
因为最近上线了samza实时流平台,实时从源数据端实时传输数据到数据仓库,于是就需要检查samza数据传输的准确性,则通过下面的python脚本定时检查两个数据端表的行数,以简单的检测下数据一致性。下面的脚本每天在早上6点运行,检查昨天以前的所有数据行数是否一致,并邮件通知。因为要检测的表比较多,并且清洗规则不一致,所以只能一张表一张表的进行比较,下面只是脚本的部分代码
#coding=utf-8
import MySQLdb
import psycopg2
import smtplib
import time
from email.mime.text import MIMEText
#通用的邮件函数
def mail(sub,content):
mailto_list=["hzwuj@tairanchina.com"]
mail_host="smtp.tairanchina.com"
mail_uer="trcloud@tairanchina.com"
mail_pass="r!dHE#3OAGs5TGeh"
message = MIMEText(content,_charset='utf-8')
message['Subject'] = sub
message['From']=mail_uer
message['To'] = ";".join(mailto_list)
try:
s = smtplib.SMTP()
s.connect(mail_host)
s.login(mail_uer,mail_pass)
s.sendmail(mail_uer, mailto_list, message.as_string())
s.close()
return True
except Exception:
print 'filed'
return False
#设置变量为今天的日期
datetime=time.strftime("%Y%m%d", time.localtime())
#从源数据库mysql去得出清洗后的总条数
conn=MySQLdb.connect(host='115.231.97.10',port=3306,user='select',passwd='123456',db='cms')
cursor1=conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
cursor1.execute("select 'biz_account' as tablename,count(*) as sum from biz_account where del_flag='0' and date(create_date)<%s" % datetime)
rows1=cursor1.fetchall()
for list1 in rows1:
sum1=list1['sum']
tablename1=list1['tablename']
#从数据库仓库greenplum从获取samza清洗后的表总行数
conn2 = psycopg2.connect(database="dw", user="admin", password="123456", host="172.30.248.24", port="5432")
cursor2 = conn2.cursor()
cursor2.execute("select 'dw_biz_account' as tablename,count(*) as sum from dw.dw_biz_account where create_day<'%s'" % datetime )
rows2=cursor2.fetchall()
for list2 in rows2:
sum2=list2[1]
tablename2=list2[0]
#总条数比较
if sum1!=sum2:
mail('数据不一致',"%s:%s %s:%s"%(tablename1,sum1,tablename2,sum2))
else:
mail('数据一致','数据一致')
conn.close()
conn2.close()
关于python跨库检查数据一致性的示例分析问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。