如何使用Python实现视频去重的小工具
同级目录下新建dup_video
import jsonimport osimport shutilimport cv2import imagehashfrom PIL import Imagefrom loguru import loggerfrom PySimpleGUI import popup_get_folderclass VideoDuplicate(object): ''' 返回整个视频的图片指纹列表 从1秒开始,每3秒抽帧,计算一张图像指纹 ''' def __init__(self): self._over_length_video: list = [] self._no_video: list = [] def _video_hash(self, video_path) -> list: ''' @param video_path -> 视频绝对路径; ''' hash_arr = [] cap = cv2.VideoCapture(video_path) ##打开视频文件 logger.info(f'开始抽帧【{video_path}】') n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 视频的帧数 logger.warning(f'视频帧数:{n_frames}') fps = cap.get(cv2.CAP_PROP_FPS) # 视频的帧率 logger.warning(f'视频帧率:{fps}') dur = n_frames / fps * 1000 # 视频大致总长度 cap_set = 1000 logger.warning(f'视频大约总长:{dur / 1000}') if dur // 1000 > 11: logger.error(f'视频时长超出规定范围【6~10】;当前时长:【{dur // 1000}】;跳过该视频;') self._over_length_video.append(video_path) return [] while cap_set 主文件路径; ''' data: list = [] for video in os.listdir(base_dir): logger.debug(f'-' * 80) name, ext = os.path.splitext(video) if ext not in ('.mp4', '.MP4'): logger.error(f'视频文件格式不符;【{video}】;执行跳过;') continue abs_video_path = os.path.join(base_dir, video) video_hash_list = self._video_hash(abs_video_path) if video_hash_list: data.append({'video_abs_path': abs_video_path, 'hash': video_hash_list}) self._write_log(data) return data @staticmethod def _write_log(data: list) -> None: '''视频哈希后的值写入日志文件''' with open(f'log.txt', 'w+', encoding='utf-8') as f: f.write(json.dumps(data)) def __call__(self, base_dir, *args, **kwargs): self.start(base_dir) logger.debug(f'-----------------------------------开始比对关键帧差值感知余弦算法-----------------------------') with open('log.txt') as f: data = json.loads(f.read()) for i in range(0, len(data) - 1): for j in range(i + 1, len(data)): if data[i]['hash'] == data[j]['hash']: _, filename = os.path.split(data[i]['video_abs_path']) logger.error(f'移动文件:【{filename}】') shutil.move( os.path.join(base_dir, filename), os.path.join(os.path.join(os.getcwd(), 'dup_video'), filename) ) logger.warning('---------------------超长视频----------------------') for i in self._over_length_video: _, name = os.path.split(i) logger.error(name)def main(): path = popup_get_folder('请选择[视频去重]文件夹') v = VideoDuplicate() v(path)if __name__ == '__main__': main()
方法补充
python+opencv抽取视频帧并去重
import os import sysimport cv2import globimport jsonimport numpy as npimport skimagefrom skimage import metricsimport hashlibprint(skimage.__version__)def load_json(json_file): with open(json_file) as fp: data = json.load(fp) return data['outputs']def ssim_dis(im1, im2): ssim = metrics.structural_similarity(im1, im2, data_range=255, multichannel=True) return ssim# cv2def isdarkOrBright(grayImg, thre_dark=10, thre_bright=230): mean = np.mean(grayImg) if mean thre_bright: return True else: return Falsedef get_file_md5(file_name): """ caculate md5 : param file_name : return md5 """ m = hashlib.md5() with open(file_name, 'rb') as fobj: while True: data = fobj.read(4096) if not data: break m.update(data) return m.hexdigest()def extract_frame(video_path, save_dir, prefix, ssim_thre=0.90): count = 0 md5set = {} last_frame = None cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) index = 0 tmp_frames = [] while cap.isOpened(): frameState, frame = cap.read() if not frameState or frame is None: break grayImg = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # if isdarkOrBright(grayImg): # index += 1 # continue assert cv2.imwrite('tmp.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 100]) md5 = get_file_md5('tmp.jpg') if md5 in md5set: md5set[md5] += 1 index += 1 continue md5set[md5] = 1 save_path = os.path.join(save_dir, prefix+'_'+str(index).rjust(4, '0')+'.jpg') if last_frame is None: if cv2.imwrite(save_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 100]): count += 1 last_frame = frame tmp_frames.append(frame) else: dis = ssim_dis(last_frame, frame) if dis <p><strong>对图片,视频,文件进行去重</strong></p><pre class="brush:py;">import osfrom tkinter import *from tkinter import messageboximport tkinter.filedialogroot=Tk()root.title("筛选重复的视频和照片")root.geometry("500x500+500+200")def wbb(): a=[] c={} filename=tkinter.filedialog.askopenfilenames() for i in filename: with open(i,'rb') as f: a.append(f.read()) for j in range(len(a)): c[a[j]]=filename[j] filename1=tkinter.filedialog.askdirectory() if filename1!="": p=1 lb1.config(text=filename1+"下的文件为:") for h in c: k=c[h].split(".")[-1] with open(filename1+"/"+str(p)+"."+k,'wb') as f: f.write(h) p=p+1 for g in os.listdir(filename1): txt.insert(END,g+'') else: messagebox.showinfo("提示",message ='请选择路径')frame1=Frame(root,relief=RAISED)frame1.place(relx=0.0)frame2=Frame(root,relief=GROOVE)frame2.place(relx=0.5)lb1=Label(frame1,text="等等下面会有变化?",font=('华文新魏',13))lb1.pack(fill=X) txt=Text(frame1,width=30,height=50,font=('华文新魏',10))txt.pack(fill=X) lb=Label(frame2,text="点我选择要进行筛选的文件:",font=('华文新魏',10))lb.pack(fill=X) btn=Button(frame2,text="请选择要进行筛选的文件",fg='black',relief="raised",bd="9",command=wbb)btn.pack(fill=X)root.mainloop()
效果图
立即学习“Python免费学习笔记(深入)”;