PHP前端开发

使用python3实现ftp服务功能实例(服务端 For Linux)

百变鹏仔 3小时前 #Python
文章标签 服务端

这篇文章主要介绍了python3实现ftp服务功能,服务端 for linux,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls
pwd
cd
put
rm
get
mkdir

立即学习“Python免费学习笔记(深入)”;

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条
server main 代码:

# Author by Andy# _*_ coding:utf-8 _*_import os, sys, json, hashlib, socketserver, timebase_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))sys.path.append(base_dir)from conf import userdb_setclass Ftp_server(socketserver.BaseRequestHandler): user_home_dir = '' def auth(self, *args):  '''验证用户名及密码'''  cmd_dic = args[0]  username = cmd_dic["username"]  password = cmd_dic["password"]  f = open(userdb_set.userdb_set(), 'r')  user_info = json.load(f)  if username in user_info.keys():   if password == user_info[username]:    self.request.send('0'.encode())    os.chdir('/home/%s' % username)    self.user_home_dir = os.popen('pwd').read().strip()    data = "%s login successed" % username    self.loging(data)   else:    self.request.send('1'.encode())    data = "%s login failed" % username    self.loging(data)    f.close  else:   self.request.send('1'.encode())   data = "%s login failed" % username   self.loging(data)   f.close   ########################################## def get(self, *args):  '''给客户端传输文件'''  request_code = {   '0': 'file is ready to get',   '1': 'file not found!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic["filename"]  if os.path.isfile(filename):   self.request.send('0'.encode('utf-8')) # 确认文件存在   self.request.recv(1024)   self.request.send(str(os.stat(filename).st_size).encode('utf-8'))   self.request.recv(1024)   m = hashlib.md5()   f = open(filename, 'rb')   for line in f:    m.update(line)    self.request.send(line)   self.request.send(m.hexdigest().encode('utf-8'))   print('From server:Md5 value has been sended!')   f.close()  else:   self.request.send('1'.encode('utf-8'))   ########################################### def cd(self, *args):  '''执行cd命令'''  user_current_dir = os.popen('pwd').read().strip()  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  path = cmd_dic['path']  if path.startswith('/'):   if self.user_home_dir in path:    os.chdir(path)    new_dir = os.popen('pwd').read()    user_current_dir = new_dir    self.request.send('Change dir successfully!'.encode("utf-8"))    data = 'Change dir successfully!'    self.loging(data)   elif os.path.exists(path):    self.request.send('Permission Denied!'.encode("utf-8"))    data = 'Permission Denied!'    self.loging(data)   else:    self.request.send('Directory not found!'.encode("utf-8"))    data = 'Directory not found!'    self.loging(data)  elif os.path.exists(path):   os.chdir(path)   new_dir = os.popen('pwd').read().strip()   if self.user_home_dir in new_dir:    self.request.send('Change dir successfully!'.encode("utf-8"))    user_current_dir = new_dir    data = 'Change dir successfully!'    self.loging(data)   else:    os.chdir(user_current_dir)    self.request.send('Permission Denied!'.encode("utf-8"))    data = 'Permission Denied!'    self.loging(data)  else:   self.request.send('Directory not found!'.encode("utf-8"))   data = 'Directory not found!'   self.loging(data)   ########################################### def rm(self, *args):  request_code = {   '0': 'file exist,and Please confirm whether to rm',   '1': 'file not found!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic['filename']  if os.path.exists(filename):   self.request.send('0'.encode("utf-8")) # 确认文件存在   client_response = self.request.recv(1024).decode()   if client_response == '0':    os.popen('rm -rf %s' % filename)    self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))    self.loging('File %s has been deleted!' % filename)   else:    self.request.send(('File %s not deleted!' % filename).encode("utf-8"))    self.loging('File %s not deleted!' % filename)  else:   self.request.send('1'.encode("utf-8"))   ######################################## def pwd(self, *args):  '''执行pwd命令'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  server_response = os.popen('pwd').read().strip().encode("utf-8")  self.request.send(server_response) ############################################# def ls(self, *args):  '''执行ls命名'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  path = cmd_dic['path']  cmd = 'ls -l %s' % path  server_response = os.popen(cmd).read().encode("utf-8")  self.request.send(server_response) ############################################ def put(self, *args):  '''接收客户端文件'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic["filename"]  filesize = cmd_dic["size"]  if os.path.isfile(filename):   f = open(filename + '.new', 'wb')  else:   f = open(filename, 'wb')  request_code = {   '200': 'Ready to recceive data!',   '210': 'Not ready to received data!'  }  self.request.send('200'.encode())  receive_size = 0  while True:   if receive_size < filesize:    data = self.request.recv(1024)    f.write(data)    receive_size += len(data)   else:    data = "File %s has been uploaded successfully!" % filename    self.loging(data)    print(data)    break    ################################################ def mkdir(self, *args):  request_code = {   '0': 'Directory has been made!',   '1': 'Directory is aleady exist!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  dir_name = cmd_dic['dir_name']  if os.path.exists(dir_name):   self.request.send('1'.encode("utf-8"))  else:   os.popen('mkdir %s' % dir_name)   self.request.send('0'.encode("utf-8"))   #############################################  def loging(self, data):  '''日志记录'''  localtime = time.asctime(time.localtime(time.time()))  log_file = '/root/ftp/ftpserver/log/server.log'  with open(log_file, 'a', encoding='utf-8') as f:   f.write('%s-->' % localtime + data + '')   ############################################## def handle(self):  # print("您本次访问使用的IP为:%s" %self.client_address[0])  # localtime = time.asctime( time.localtime(time.time()))  # print(localtime)  while True:   try:    self.data = self.request.recv(1024).decode() #    # print(self.data)    cmd_dic = json.loads(self.data)    action = cmd_dic["action"]    # print("用户请求%s"%action)    if hasattr(self, action):     func = getattr(self, action)     func(cmd_dic)   except Exception as e:    self.loging(str(e))    breakdef run(): HOST, PORT = '0.0.0.0', 6969 print("The server is started,and listenning at port 6969") server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server) server.serve_forever()if name == 'main': run()

设置用户口令代码:

#Author by Andy#_*_ coding:utf-8 _*_import os,json,hashlib,sysbase_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))userdb_file = base_dir+"data\userdb"# print(userdb_file)def userdb_set(): if os.path.isfile(userdb_file):  # print(userdb_file)  return userdb_file else:  print('请先为您的服务器创建用户!')  user_data = {}  dict={}  Exit_flags = True  while Exit_flags:   username = input("Please input username:")   if username != 'exit':    password = input("Please input passwod:")    if password != 'exit':      user_data.update({username:password})      m = hashlib.md5()      # m.update('hello')      # print(m.hexdigest())      for i in user_data:       # print(i,user_data[i])       m.update(user_data[i].encode())       dict.update({i:m.hexdigest()})    else:     break   else:    break  f = open(userdb_file,'w')  json.dump(dict,f)  f.close() return userdb_file

目录结构: