socket服务器再细分可分为多种了,tcp,udp,websocket,都是调用socket模块,但是具体实现起来有一点细微的差别
先给出一个tcp和udp通过socket协议实现的聊天室的例子
python聊天室(python2.7版本):
都是分别运行server.py和client.py,就可以进行通讯了。
TCP版本:
socket-tcp-server.py(服务端):
#-*- encoding:utf-8 -*-
#socket.getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0)
#根据给定的参数host/port,相应的转换成一个包含用于创建socket对象的五元组,
#参数host为域名,以字符串形式给出代表一个IPV4/IPV6地址或者None.
#参数port如果字符串形式就代表一个服务名,比如“http”"ftp""email"等,或者为数字,或者为None
#参数family为地主族,可以为AF_INET ,AF_INET6 ,AF_UNIX.
#参数socktype可以为SOCK_STREAM(TCP)或者SOCK_DGRAM(UDP)
#参数proto通常为0可以直接忽略
#参数flags为AI_*的组合,比如AI_NUMERICHOST,它会影响函数的返回值
#附注:给参数host,port传递None时建立在C基础,通过传递NULL。
#该函数返回一个五元组(family, socktype, proto, canonname, sockaddr),同时第五个参数sockaddr也是一个二元组(address, port)
#更多的方法及链接请访问
# Echo server program
from socket import *
import sys
import threading
from time import ctime
from time import localtime
import traceback
import time
import subprocess
reload(sys)
sys.setdefaultencoding("utf8")
HOST='127.0.0.1'
PORT=8555 #设置侦听端口
BUFSIZ=1024
class TcpServer():
def __init__(self):
self.ADDR=(HOST, PORT)
try:
self.sock=socket(AF_INET, SOCK_STREAM)
print '%d is open' % PORT
self.sock.bind(self.ADDR)
self.sock.listen(5)
#设置退出条件
self.STOP_CHAT=False
# 所有监听的客户端
self.clients = {}
self.thrs = {}
self.stops = []
except Exception,e:
print "%d is down" % PORT
return False
def IsOpen(ip, port):
s = socket(AF_INET, SOCK_STREAM)
try:
s.connect((ip, int(port)))
# s.shutdown(2)
# 利用shutdown()函数使socket双向数据传输变为单向数据传输。shutdown()需要一个单独的参数,
# 该参数表示s了如何关闭socket。具体为:0表示禁止将来读;1表示禁止将来写;2表示禁止将来读和写。
print '%d is open' % port
return True
except:
print '%d is down' % port
return False
def listen_client(self):
while not self.STOP_CHAT:
print(u'等待接入,侦听端口:%d' % (PORT))
self.tcpClientSock, self.addr=self.sock.accept()
print(u'接受连接,客户端地址:',self.addr)
address = self.addr
#将建立的client socket链接放到列表self.clients中
self.clients[address] = self.tcpClientSock
#分别将每个建立的链接放入进程中,接收且分发消息
self.thrs[address] = threading.Thread(target=self.readmsg, args=[address])
self.thrs[address].start()
time.sleep(0.5)
def readmsg(self,address):
#如果地址不存在,则返回False
if address not in self.clients:
return False
#得到发送消息的client socket
client = self.clients[address]
while True:
try:
#获取到消息内容data
data=client.recv(BUFSIZ)
except:
print(e)
self.close_client(address)
break
if not data:
break
#python3使用bytes,所以要进行编码
#s='%s发送给我的信息是:[%s] %s' %(addr[0],ctime(), data.decode('utf8'))
#对日期进行一下格式化
ISOTIMEFORMAT='%Y-%m-%d %X'
stime=time.strftime(ISOTIMEFORMAT, localtime())
s=u'%s发送给我的信息是:%s' %(str(address),data.decode('utf8'))
#将获得的消息分发给链接中的client socket
for k in self.clients:
self.clients[k].send(s.encode('utf8'))
self.clients[k].sendall('sendall:'+s.encode('utf8'))
print str(k)
print([stime], ':', data.decode('utf8'))
#如果输入quit(忽略大小写),则程序退出
STOP_CHAT=(data.decode('utf8').upper()=="QUIT")
if STOP_CHAT:
print "quit"
self.close_client(address)
print "already quit"
break
def close_client(self,address):
try:
client = self.clients.pop(address)
self.stops.append(address)
client.close()
for k in self.clients:
self.clients[k].send(str(address) + u"已经离开了")
except:
pass
print(str(address)+u'已经退出')
if __name__ == '__main__':
tserver = TcpServer()
tserver.listen_client()
——————————华丽的分割线——————————
socket-tcp-client.py (客户端):
#-*- encoding:utf-8 -*-
from socket import *
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding("utf8")
#测试,连接本机
HOST='127.0.0.1'
#设置侦听端口
PORT=8555
BUFSIZ=1024
class TcpClient:
ADDR=(HOST, PORT)
def __init__(self):
self.HOST = HOST
self.PORT = PORT
self.BUFSIZ = BUFSIZ
#创建socket连接
self.client = socket(AF_INET, SOCK_STREAM)
self.client.connect(self.ADDR)
#起一个线程,监听接收的信息
self.trecv = threading.Thread(target=self.recvmsg)
self.trecv.start()
def sendmsg(self):
#循环发送聊天消息,如果socket连接存在则一直循环,发送quit时关闭链接
while self.client.connect_ex(self.ADDR):
data=raw_input('>:')
if not data:
break
self.client.send(data.encode('utf8'))
print(u'发送信息到%s:%s' %(self.HOST,data))
if data.upper()=="QUIT":
self.client.close()
print u"已关闭"
break
def recvmsg(self):
#接收消息,如果链接一直存在,则持续监听接收消息
try:
while self.client.connect_ex(self.ADDR):
data=self.client.recv(self.BUFSIZ)
print(u'从%s收到信息:%s' %(self.HOST,data.decode('utf8')))
except Exception,e:
print str(e)
if __name__ == '__main__':
client=TcpClient()
client.sendmsg()
UDP版本:
socket-udp-server.py
# -*- coding:utf8 -*-
import sys
import time
import traceback
import threading
reload(sys)
sys.setdefaultencoding('utf-8')
import socket
import traceback
HOST = "127.0.0.1"
PORT = 9555
CHECK_PERIOD = 20
CHECK_TIMEOUT = 15
class UdpServer(object):
def __init__(self):
self.clients = []
self.beats = {}
self.ADDR = (HOST,PORT)
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(self.ADDR) # 绑定同一个域名下的所有机器
self.beattrs = threading.Thread(target=self.checkheartbeat)
self.beattrs.start()
except Exception,e:
traceback.print_exc()
return False
def listen_client(self):
while True:
time.sleep(0.5)
print "hohohohohoo"
try:
recvData,address = self.sock.recvfrom(2048)
if not recvData:
self.close_client(address)
break
if address in self.clients:
senddata = u"%s发送给我的信息是:%s" %(str(address),recvData.decode('utf8'))
if recvData.upper() == "QUIT":
self.close_client(address)
if recvData == "HEARTBEAT":
self.heartbeat(address)
continue
else:
self.clients.append(address)
senddata = u"%s发送给我的信息是:%s" %(str(address),u'进入了聊天室')
for c in self.clients:
try:
self.sock.sendto(senddata,c)
except Exception,e:
print str(e)
self.close_client(c)
except Exception,e:
# traceback.print_exc()
print str(e)
pass
def heartbeat(self,address):
self.beats[address] = time.time()
def checkheartbeat(self):
while True:
print "checkheartbeat"
print self.beats
try:
for c in self.clients:
print time.time()
print self.beats[c]
if self.beats[c] + CHECK_TIMEOUT <time.time():
print u"%s心跳超时,连接已经断开" %str(c)
self.close_client(c)
else:
print u"checkp%s,没有断开" %str(c)
except Exception,e:
traceback.print_exc()
print str(e)
pass
time.sleep(CHECK_PERIOD)
def close_client(self,address):
try:
if address in self.clients:
self.clients.remove(address)
if self.beats.has_key(address):
del self.beats[address]
print self.clients
for c in self.clients:
self.sock.sendto(u'%s已经离开了' % str(address),c)
print(str(address)+u'已经退出')
except Exception,e:
print str(e)
raise
if __name__ == "__main__":
udpServer = UdpServer()
udpServer.listen_client()
——————————华丽的分割线——————————
socket-udp-client.py:
# -*- coding:utf8 -*-
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
import socket
HOST = "127.0.0.1"
PORT = 9555
#BEAT_PORT = 43278
BEAT_PERIOD = 5
class UdpClient(object):
def __init__(self):
self.clientsock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.HOST = HOST
self.ADDR = (HOST,PORT)
self.clientsock.sendto(u'请求建立链接',self.ADDR)
self.recvtrs = threading.Thread(target=self.recvmsg)
self.recvtrs.start()
self.hearttrs = threading.Thread(target=self.heartbeat)
self.hearttrs.start()
def sendmsg(self):
while True:
data = raw_input(">:")
if not data:
break
self.clientsock.sendto(data.encode('utf-8'),self.ADDR)
if data.upper() == 'QUIT':
self.clientsock.close()
break
def heartbeat(self):
while True:
self.clientsock.sendto('HEARTBEAT',self.ADDR)
time.sleep(BEAT_PERIOD)
def recvmsg(self):
while True:
recvData,addr = self.clientsock.recvfrom(1024)
if not recvData:
break
print(u'从%s收到信息:%s' %(self.HOST,recvData.decode('utf8')))
if __name__ == "__main__":
udpClient = UdpClient()
udpClient.sendmsg()
其实很简单,主要是因为python的使用场景是一些比较简单,并发量低的,对速度要求不是很高。网游服务端,交互量会很大,实时性要求很高,python这种解释性动态语言,较java、C++并不占据优势。使用pip或easy_install可以管理和安装python的package包,实际上它们都是从pypi服务器中搜索和下载package的。目前在pypi服务器上,有超过三万多个package,同时还允许我们将自己的代码也上传发布到服务器上。这样,世界上的所有人都能使用pip或easy_install来下载使用我们的代码了。具体步骤如下:
首先创建项目文件和setup文件。
目录文件结构如下:
project/
simpletest/
__init__.py
test.py
setup.py
假设项目文件只有一个simpletest包,里面有一个test.py文件。
创建的setup.py文件格式大致如下,其中,install_requires字段可以列出依赖的包信息,用户使用pip或easy_install安装时会自动下载依赖的包。详细的格式参考文档。
from setuptools import setup, find_packages
setup(
name = 'simpletest',
version = '0.0.1',
keywords = ('simple', 'test'),
description = 'just a simple test',
license = 'MIT License',
install_requires = ['simplejson>=1.1'],
author = 'yjx',
author_email = 'not@all.com',
packages = find_packages(),
platforms = 'any',
)
然后将代码打包。
打包只需要执行python
setup.py xxx命令即可,其中xxx是打包格式的选项,如下:
# 以下所有生成文件将在当前路径下 dist 目录中
python setup.py bdist_egg # 生成easy_install支持的格式
python setup.py sdist # 生成pip支持的格式,下文以此为例
发布到pypi。
发布到pypi首先需要注册一个账号,然后进行如下两步:
注册package。输入python setup.py register。
上传文件。输入python setup.py sdist upload。
安装测试
上传成功后,就可以使用pip来下载安装了。
另外,pypi还有一个测试服务器,可以在这个测试服务器上做测试,测试的时候需要给命令指定额外的"-r"或"-i"选项,如python
setup.py register -r "",python
setup.py sdist upload -r "",pip
install -i "" simpletest。
发布到测试服务器的时候,建议在linux或cygwin中发布,如果是在windows中,参考文档,需要生成.pypirc文件
欢迎分享,转载请注明来源:夏雨云
评论列表(0条)