|
from pathlib import Path |
|
import os |
|
import csv |
|
import subprocess |
|
import pandas as pd |
|
import sys |
|
sys.path.append('..') |
|
|
|
from levi.converters import HHMMSS_to_sec |
|
|
|
def trim_media(media_in, |
|
media_out, |
|
start, |
|
end): |
|
|
|
|
|
WAV_CHANNELS = 1 |
|
WAV_SAMPLE_RATE = 16000 |
|
|
|
media_type = Path(media_in).suffix |
|
ext = Path(media_out).suffix |
|
|
|
if isinstance(start, str): |
|
start_sec = HHMMSS_to_sec(start) |
|
else: |
|
start_sec = float(start) |
|
if isinstance(end, str): |
|
end_sec = HHMMSS_to_sec(end) |
|
else: |
|
end_sec = float(end) |
|
|
|
if ext == '.wav': |
|
|
|
print(f'...Using ffmpeg to trim video from {start} to {end} \n and convert to {WAV_SAMPLE_RATE}Hz WAV with {WAV_CHANNELS} channels...') |
|
print(f'...generating {media_out}...') |
|
|
|
subprocess.call(['ffmpeg', |
|
'-y', |
|
'-i', |
|
media_in, |
|
'-ss', |
|
f'{start_sec}', |
|
'-to', |
|
f'{end_sec}', |
|
'-acodec', |
|
'pcm_s16le', |
|
'-ac', |
|
WAV_CHANNELS, |
|
'-ar', |
|
WAV_SAMPLE_RATE, |
|
media_out, |
|
'-hide_banner', |
|
'-loglevel', |
|
'warning' |
|
],shell=False) |
|
|
|
else: |
|
|
|
print(f'...Using ffmpeg to trim video from {start_sec} to {end_sec}...') |
|
print(f'...generating {media_out}...') |
|
|
|
subprocess.call(['ffmpeg', |
|
'-y', |
|
'-i', |
|
media_in, |
|
'-ss', |
|
f'{start_sec}', |
|
'-to', |
|
f'{end_sec}', |
|
'-c', |
|
'copy', |
|
media_out, |
|
'-hide_banner', |
|
'-loglevel', |
|
'warning' |
|
],shell=False) |
|
|
|
def trim_media_batch(extract_timings_csv, |
|
outpath, |
|
suffix='', |
|
convert_to=False): |
|
"""trim a batch of media files given a csv of timings |
|
|
|
Args: |
|
extract_timings_csv (str): path to csv with columns: |
|
filepath, start (HH:MM:SS), end (HH:MM:SS) |
|
outpath (str): output path |
|
suffix (str, optional): save output trimmed files with this suffix. Defaults to ''. |
|
convert_to (bool, optional): [None, 'wav','mp4']. Defaults to False. |
|
Returns: |
|
outfiles (list): list of file paths created |
|
""" |
|
|
|
|
|
|
|
os.makedirs(outpath, exist_ok=True) |
|
|
|
samples_df = pd.read_csv( |
|
extract_timings_csv, |
|
skip_blank_lines=True, |
|
index_col=False, |
|
names=['media_in','startHMS','endHMS'], |
|
header=0 |
|
).dropna().sort_values( |
|
by='media_in',ignore_index=True).reset_index(drop=True) |
|
|
|
print(f'TRIMMING {len(samples_df.index)} FILES...') |
|
|
|
|
|
samples_df['count'] = samples_df.groupby('media_in').cumcount() |
|
if not os.path.exists(outpath): |
|
os.makedirs(outpath) |
|
|
|
outfiles=[] |
|
for i, rec in samples_df.iterrows(): |
|
media_in,startHMS,endHMS, count = rec.values |
|
suffix_use = f'{suffix}{count}' if count > 0 else suffix |
|
|
|
if not os.path.exists(media_in): |
|
print(f'!!!WARNING: media not found: {media_in}') |
|
continue |
|
|
|
media_type = Path(media_in).suffix |
|
sessname = Path(media_in).stem |
|
print(f'...Input media: {media_in}') |
|
|
|
if convert_to=='wav': |
|
ext = '.wav' |
|
elif convert_to=='mp4': |
|
ext = '.mp4' |
|
else: |
|
ext = media_type |
|
|
|
outfile = os.path.expanduser(os.path.join(outpath,f'{sessname}{suffix_use}{ext}')) |
|
|
|
trim_media(media_in, outfile, HHMMSS_to_sec(startHMS), HHMMSS_to_sec(endHMS)) |
|
|
|
outfiles.append(outfile) |
|
return(outfiles) |