如何用python实现简单的聊天小程序
发表于:2025-02-19 作者:千家信息网编辑
千家信息网最后更新 2025年02月19日,这篇"如何用python实现简单的聊天小程序"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看
千家信息网最后更新 2025年02月19日如何用python实现简单的聊天小程序
这篇"如何用python实现简单的聊天小程序"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"如何用python实现简单的聊天小程序"文章吧。
实现思路
x01 服务端的建立
首先,在服务端,使用socket进行消息的接受,每接受一个socket的请求,就开启一个新的线程来管理消息的分发与接受,同时,又存在一个handler来管理所有的线程,从而实现对聊天室的各种功能的处理
x02 客户端的建立
客户端的建立就要比服务端简单多了,客户端的作用只是对消息的发送以及接受,以及按照特定的规则去输入特定的字符从而实现不同的功能的使用,因此,在客户端这里,只需要去使用两个线程,一个是专门用于接受消息,一个是专门用于发送消息的
至于为什么不用一个呢,那是因为,只用一个的话,当接受了消息,在发送之前接受消息的处于阻塞状态,同理,发送消息也是,那么要是将这两个功能放在一个地方实现,就会导致没有办法连续发送或者接受消息了
实现方式
服务端实现
import jsonimport threadingfrom socket import *from time import ctimeclass PyChattingServer: __socket = socket(AF_INET, SOCK_STREAM, 0) __address = ('', 12231) __buf = 1024 def __init__(self): self.__socket.bind(self.__address) self.__socket.listen(20) self.__msg_handler = ChattingHandler() def start_session(self): print('等待客户连接...\r\n') try: while True: cs, caddr = self.__socket.accept() # 利用handler来管理线程,实现线程之间的socket的相互通信 self.__msg_handler.start_thread(cs, caddr) except socket.error: passclass ChattingThread(threading.Thread): __buf = 1024 def __init__(self, cs, caddr, msg_handler): super(ChattingThread, self).__init__() self.__cs = cs self.__caddr = caddr self.__msg_handler = msg_handler # 使用多线程管理会话 def run(self): try: print('...连接来自于:', self.__caddr) data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n' self.__cs.sendall(bytes(data, 'utf-8')) while True: data = self.__cs.recv(self.__buf).decode('utf-8') if not data: break self.__msg_handler.handle_msg(data, self.__cs) print(data) except socket.error as e: print(e.args) pass finally: self.__msg_handler.close_conn(self.__cs) self.__cs.close()class ChattingHandler: __help_str = "[ SYSTEM ]\r\n" \ "输入/ls,即可获得所有登陆用户信息\r\n" \ "输入/h,即可获得帮助\r\n" \ "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \ "输入/i,即可屏蔽群聊信息\r\n" \ "再次输入/i,即可取消屏蔽\r\n" \ "所有首字符为/的信息都不会发送出去" __buf = 1024 __socket_list = [] __user_name_to_socket = {} __socket_to_user_name = {} __user_name_to_broadcast_state = {} def start_thread(self, cs, caddr): self.__socket_list.append(cs) chat_thread = ChattingThread(cs, caddr, self) chat_thread.start() def close_conn(self, cs): if cs not in self.__socket_list: return # 去除socket的记录 nickname = "SOMEONE" if cs in self.__socket_list: self.__socket_list.remove(cs) # 去除socket与username之间的映射关系 if cs in self.__socket_to_user_name: nickname = self.__socket_to_user_name[cs] self.__user_name_to_socket.pop(self.__socket_to_user_name[cs]) self.__socket_to_user_name.pop(cs) self.__user_name_to_broadcast_state.pop(nickname) nickname += " " # 广播某玩家退出聊天室 self.broadcast_system_msg(nickname + "离开了PY_CHATTING") # 管理用户输入的信息 def handle_msg(self, msg, cs): js = json.loads(msg) if js['type'] == "login": if js['msg'] not in self.__user_name_to_socket: if ' ' in js['msg']: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号不能够带有空格' }), cs) else: self.__user_name_to_socket[js['msg']] = cs self.__socket_to_user_name[cs] = js['msg'] self.__user_name_to_broadcast_state[js['msg']] = True self.send_to(json.dumps({ 'type': 'login', 'success': True, 'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)' }), cs) # 广播其他人,他已经进入聊天室 self.broadcast_system_msg(js['msg'] + "已经进入了聊天室") else: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号已存在' }), cs) # 若玩家处于屏蔽模式,则无法发送群聊消息 elif js['type'] == "broadcast": if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: self.broadcast(js['msg'], cs) else: self.send_to(json.dumps({ 'type': 'broadcast', 'msg': '屏蔽模式下无法发送群聊信息' }), cs) elif js['type'] == "ls": self.send_to(json.dumps({ 'type': 'ls', 'msg': self.get_all_login_user_info() }), cs) elif js['type'] == "help": self.send_to(json.dumps({ 'type': 'help', 'msg': self.__help_str }), cs) elif js['type'] == "sendto": self.single_chatting(cs, js['nickname'], js['msg']) elif js['type'] == "ignore": self.exchange_ignore_state(cs) def exchange_ignore_state(self, cs): if cs in self.__socket_to_user_name: state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] if state: state = False else: state = True self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs]) self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: msg = "通常模式" else: msg = "屏蔽模式" self.send_to(json.dumps({ 'type': 'ignore', 'success': True, 'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg) }), cs) else: self.send_to({ 'type': 'ignore', 'success': False, 'msg': '切换失败' }, cs) def single_chatting(self, cs, nickname, msg): if nickname in self.__user_name_to_socket: msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % ( ctime(), self.__socket_to_user_name[cs], nickname, msg) self.send_to_list(json.dumps({ 'type': 'single', 'msg': msg }), self.__user_name_to_socket[nickname], cs) else: self.send_to(json.dumps({ 'type': 'single', 'msg': '该用户不存在' }), cs) print(nickname) def send_to_list(self, msg, *cs): for i in range(len(cs)): self.send_to(msg, cs[i]) def get_all_login_user_info(self): login_list = "[ SYSTEM ] ALIVE USER : \r\n" for key in self.__socket_to_user_name: login_list += self.__socket_to_user_name[key] + ",\r\n" return login_list def send_to(self, msg, cs): if cs not in self.__socket_list: self.__socket_list.append(cs) cs.sendall(bytes(msg, 'utf-8')) def broadcast_system_msg(self, msg): data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg) js = json.dumps({ 'type': 'system_msg', 'msg': data }) # 屏蔽了群聊的玩家也可以获得系统的群发信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def broadcast(self, msg, cs): data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg) js = json.dumps({ 'type': 'broadcast', 'msg': data }) # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name \ and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]: self.__socket_list[i].sendall(bytes(js, 'utf-8'))def main(): server = PyChattingServer() server.start_session()main()
客户端的实现
import jsonimport threadingfrom socket import *is_login = Falseis_broadcast = Trueclass ClientReceiveThread(threading.Thread): __buf = 1024 def __init__(self, cs): super(ClientReceiveThread, self).__init__() self.__cs = cs def run(self): self.receive_msg() def receive_msg(self): while True: msg = self.__cs.recv(self.__buf).decode('utf-8') if not msg: break js = json.loads(msg) if js['type'] == "login": if js['success']: global is_login is_login = True print(js['msg']) elif js['type'] == "ignore": if js['success']: global is_broadcast if is_broadcast: is_broadcast = False else: is_broadcast = True print(js['msg']) else: if not is_broadcast: print("[现在处于屏蔽模式]") print(js['msg'])class ClientSendMsgThread(threading.Thread): def __init__(self, cs): super(ClientSendMsgThread, self).__init__() self.__cs = cs def run(self): self.send_msg() # 根据不同的输入格式来进行不同的聊天方式 def send_msg(self): while True: js = None msg = input() if not is_login: js = json.dumps({ 'type': 'login', 'msg': msg }) elif msg[0] == "@": data = msg.split(' ') if not data: print("请重新输入") break nickname = data[0] nickname = nickname.strip("@") if len(data) == 1: data.append(" ") js = json.dumps({ 'type': 'sendto', 'nickname': nickname, 'msg': data[1] }) elif msg == "/help": js = json.dumps({ 'type': 'help', 'msg': None }) elif msg == "/ls": js = json.dumps({ 'type': 'ls', 'msg': None }) elif msg == "/i": js = json.dumps({ 'type': 'ignore', 'msg': None }) else: if msg[0] != '/': js = json.dumps({ 'type': 'broadcast', 'msg': msg }) if js is not None: self.__cs.sendall(bytes(js, 'utf-8'))def main(): buf = 1024 # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了 address = ("127.0.0.1", 12231) cs = socket(AF_INET, SOCK_STREAM, 0) cs.connect(address) data = cs.recv(buf).decode("utf-8") if data: print(data) receive_thread = ClientReceiveThread(cs) receive_thread.start() send_thread = ClientSendMsgThread(cs) send_thread.start() while True: passmain()
以上就是关于"如何用python实现简单的聊天小程序"这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注行业资讯频道。
消息
输入
信息
客户
模式
线程
服务
内容
玩家
用户
端的
管理
聊天室
程序
不同
功能
空格
帮助
成功
两个
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
sql数据库创建表
知识题库软件开发
思科有意收购网络安全初创企业
云图管家服务器端安装教程
软件开发是不是要数学成绩好
有没有做网络技术的大佬
物联网转网络安全
移动网络技术有哪些
搭建ed2k服务器
陕西管理系统软件开发商
网络安全实用技术第三版pdf
sql怎么建立数据库数据库
防止黑客攻击数据库服务器的有效策略包括
服务器任务管理器在哪
软件开发工资一般多少的
服务器一般错误
vc中的ftp上传文件到服务器
网络安全教育培训方案2020
软件开发具备的条件
一般一台服务器承受多少并发数
夏绅互联网科技
泉州回收惠普服务器
我国农药登记数据库文件
网络技术和弱电
数据库系统程序语言
多益网络技术岗五险一金
服务器一般错误
软件开发测试内蒙古招聘
我的世界能不能自己创建服务器
英雄联盟对局服务器偏高什么意思