python怎么结合shell自动创建kafka的连接器
发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,这篇"python怎么结合shell自动创建kafka的连接器"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,
千家信息网最后更新 2025年01月18日python怎么结合shell自动创建kafka的连接器
这篇"python怎么结合shell自动创建kafka的连接器"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"python怎么结合shell自动创建kafka的连接器"文章吧。
环境
cat /etc/redhat-release CentOS Linux release 7.5.1804 (Core) [root@localhost ~]# uname -aLinux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linuxpython -VPython 2.7.5
安装连接oracle的python包
pip install cx_Oracle==7.3
获取oracle表信息
cat query_oracle.py #!/usr/bin/env pythonimport cx_Oracleimport sysimport osimport csvimport tracebackfile = open("oracle.txt", 'w').close()user = "test"passwd = "test"listener = '10.0.2.15:1521/orcl'conn = cx_Oracle.connect(user, passwd, listener)cursor = conn.cursor()sql = "select table_name from user_tables" cursor.execute(sql)LIST1=[]while True: row = cursor.fetchone() if row == None: break for table in row: #print table LIST1.append(table)LIST2=[]for i in LIST1: sql3 = "select COLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALE from cols WHERE TABLE_name=upper('%s')" %i cursor.execute(sql3) cursor.execute(sql3) row3 = cursor.fetchall() for data in row3: #LIST2.append(i) LIST2.extend(list(data)) LIST2.append(i) f=open('oracle.txt','a+') print >> f,LIST2 LIST2=[]#f=open('test.txt','a+')#select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student'); #select column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');
去掉多余部分
cat auto.sh #!/bin/bash#python query_oracle.py |tr "," ' '|tr "'" ' '|tr "[" " "|tr "]" " "#>oracle.txt>oracle_tables.txtcat oracle.txt |tr "[],'" " "|sed "s#[ ][ ]*# #g"|sed 's/^[ \t]*//g' >> oracle_tables.txt
cat oracle_tables.txt SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIMESNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE
shell 脚本处理表信息文件
cat connect.sh #!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_tables.txt |egrep -v '#|^$'|wc -l)#清空自动创建连接器的脚本>create-connect.sh#循环临时文件每一行for i in `seq $FILE_NUM`do FILE_LINE=$(sed -n ${i}p oracle_tables.txt) TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF-1)}') COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}') REAL_COL_NUM=`expr $COL_NUM - 2` #清空临时文件 >${TABLE_NAME}.txt >${TABLE_NAME}.sql #循环临时文件每行列名所在的列 for j in `seq 1 4 $REAL_COL_NUM` do k=`expr $j + 1` m=`expr $j + 2` n=`expr $j + 3` COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j}) COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k}) COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m}) COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n}) #判断列的数据类型是否是NUMBER if [ "$COL_DATA_TYPE" = "NUMBER" ] then #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中 echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ${TABLE_NAME}.txt else #循环拼接SQL查询中的列名部分,追加到临时文件中 echo "$COL_NAME" >> ${TABLE_NAME}.txt fi done #拼接完整的SQL语句,追加到临时文件中 echo "select $(cat ${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)> ${TABLE_NAME}.sql#循环追加每个表对应的连接器到自动创建连接器的脚本中cat >> create-connect.sh << EOFcurl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_source_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_$TABLE_NAME","mode": "{{ CONNECT_MODE }}","query": "$(cat ${TABLE_NAME}.sql)"}}' >/dev/null 2>&1EOFdone
说明:脚本中{{ 变量名 }}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。
增强版处理表信息脚本
#!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_time_tables.txt |egrep -v '#|^$'|wc -l)#清空创建连接器的脚本并追加echos函数> create-jdbc-connect.shcat >> create-jdbc-connect.sh << EOF#!/bin/bashechos(){case \$1 inred) echo -e "\033[31m \$2 \033[0m";;green) echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue) echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*) echo "\$2";;esac}EOF> create-jdbc-connect-time.shcat >> create-jdbc-connect-time.sh << EOF#!/bin/bashechos(){case \$1 inred) echo -e "\033[31m \$2 \033[0m";;green) echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue) echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*) echo "\$2";;esac}EOF#创建表相关文件目录mkdir -p ./TABLE_TIME#循环临时文件每一行for i in `seq $FILE_NUM`do FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt) TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF)}') COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}') REAL_COL_NUM=`expr $COL_NUM - 2` #清空临时文件 >./TABLE_TIME/${TABLE_NAME}_time.txt >./TABLE_TIME/${TABLE_NAME}_time.sql >./TABLE_TIME/${TABLE_NAME}.sql #循环临时文件每行列名所在的列 for j in `seq 1 4 $REAL_COL_NUM` do k=`expr $j + 1` m=`expr $j + 2` n=`expr $j + 3` COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j}) COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k}) COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m}) COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n}) #判断列的数据类型是否是NUMBER if [ "$COL_DATA_TYPE" = "NUMBER" ] then #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中 echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt else #循环拼接SQL查询中的列名部分,追加到临时文件中 echo "$COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt fi #判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中 TIME_COL=({{ TABLE_TIME_COL }}) for TIME in ${TIME_COL[@]} do if [ "$COL_NAME" = "$TIME" ] then echo "$COL_NAME" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt fi done done #拼接完整的SQL语句,追加到临时文件中 if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ] then #echo "select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)> ./TABLE_TIME/${TABLE_NAME}_time.sql echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt) > ./TABLE_TIME/${TABLE_NAME}_time.sql else echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME" >> ./TABLE_TIME/${TABLE_NAME}.sql fi#循环追加每个表对应的连接器到自动创建连接器的脚本中if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]thencat >> create-jdbc-connect-time.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_time_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then echos green "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器成功"else echos red "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器失败"fiEOFelsecat >> create-jdbc-connect.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then echos green "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器成功"else echos red "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器失败"fiEOFfidone
以上就是关于"python怎么结合shell自动创建kafka的连接器"这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注行业资讯频道。
连接器
文件
循环
脚本
内容
部分
成功
查询
信息
变量
所在
数据
文章
知识
篇文章
类型
语句
处理
一行
价值
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发资产负债表里哪项
坦克连接服务器
梦想启航互联网科技有限公司
万兆服务器可以当家庭电脑吗
软件开发他们怎么赚钱
餐盟网络技术有限公司
学校网络安全教育周活动美篇
网络安全开发先干开发
电脑服务器上找不到打印机
三十三岁做软件开发
如何往数据库表加入图片数据
广州点博网络技术 怎么样
宁波替批网络技术有限公司
广西仓储生鲜配送软件开发
更改数据库时间
开家网络安全信息技术公司
服务器换内存
网络安全密匙如何修改
工控街网络技术
无忧h2服务器
云服务器平台搭建
计算机数据库技术三级
蓝奏云网盘免费服务器接口
保定久久软件开发有限公司
大连网络技术环境
c dll 数据库
网络安全值得讨论的话题
微擎密码忘记修改数据库
微乐够级的软件开发商
饥荒服务器mod目录