1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
| def get_m3u8_url_decode(soup_my_url): list = soup_my_url.find(class_='clear pl-play js-addDanmu').find_all('script') m3u8_file_name = soup_my_url.find('h3').string print('File name: ' + m3u8_file_name) os.makedirs(f'{dir_download_url}/{m3u8_file_name}', exist_ok=True) file_path = f'{dir_download_url}/{m3u8_file_name}' print('[Create a new dir success!]') encode_url = re.findall(r'unescape\(base64decode\(.+\'', str(list[0]), re.M | re.VERBOSE | re.DOTALL) split_encode_url = encode_url[0].split('unescape(base64decode(')[-1] decode_base64_url = base64.b64decode(split_encode_url) unescape_url = parse.unquote(str(decode_base64_url)) all_parts_list = [] all_parts_list = re.findall(r'(https:.*\.m3u8)', str(unescape_url), re.M) print('[Debug] Download File parts: ') print(all_parts_list)
for i in range(len(all_parts_list)): all_parts_info = all_parts_list[i] get_ts_files(all_parts_list[i], i, file_path)
def get_ts_files(parts_info, i, file_path): os.makedirs(f'{file_path}/{i}', exist_ok=True)
test_url = parts_info print(test_url) all_content = request_url(test_url) with open(f'{file_path}/{i}/part{i}.m3u8', 'w') as f: f.write(all_content) print('[success download m3u8]')
tslist = [] test_main_url = test_url print('[Test Url]: ' + test_main_url) file_line = all_content.split('\n') for line in file_line: if re.match('[^#]', line): new_test_url = re.sub(r'[^\/]+(?!.*\/)', f'{line}', test_main_url) tslist.append(f'{new_test_url}') print('[Num tslist]: ' + str(len(tslist))) ask_start = input('Would you like to download now? (y/n): \n') if ask_start == 'y': download_ts_files(tslist, file_path, i) else: print('you can download later...') sys.exit(0)
def download_ts_files(tslist, file_path, i): for item_ts in tslist: ts_name = item_ts.split('/')[-1] ts_file_path = f'{file_path}/{i}/{ts_name}' print('[Prepare File Name]: ' + f'{ts_name}') request_url_content(item_ts, ts_file_path) is_merge = input('Downloaded all ts files, start merge now? (y/n)\n') if is_merge == 'y': merge_file_name = f'part{i}' new_file_path = f'{file_path}/{i}/' merge_ts(new_file_path, merge_file_name) else: print('Finished, you can use merge_ts.py to merge them later!\n') sys.exit(0)
def merge_ts(new_file_path, merge_file_name): """ use cmd to merge file """ tsPath = new_file_path
path_list = os.listdir(f'{tsPath}') path_list.sort() li = [os.path.join(tsPath,filename) for filename in path_list if '.ts' in filename] tsfiles = '|'.join(li)
saveMp4file = tsPath + f'{merge_file_name}.mp4'
cmd = 'ffmpeg -i "concat:%s" -acodec copy -vcodec copy -absf aac_adtstoasc %s'% (tsfiles,saveMp4file) os.system(cmd) delete_file = input('Would you like to delete all ts files? (y/n): \n') if delete_file == 'y': remove_ts(tsPath) else: print('you can delete later......') sys.exit(0)
def remove_ts(tsPath): for inline in glob.glob(os.path.join(tsPath, '*.ts')): os.remove(inline) print('finish delete ts files!')
|