Spaces:
Sleeping
Sleeping
File size: 3,750 Bytes
baa733f 8385f8f baa733f 8385f8f baa733f 8385f8f baa733f d6e6e14 baa733f d6e6e14 baa733f d6e6e14 baa733f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
import time
import wavmark
import streamlit as st
import os
import torch
import datetime
import numpy as np
import soundfile
from wavmark.utils import file_reader
import subprocess
import sys
import time
def my_read_file(audio_path, max_second):
signal, sr, audio_length_second = file_reader.read_as_single_channel_16k(audio_path, default_sr)
if audio_length_second > max_second:
signal = signal[0:default_sr * max_second]
audio_length_second = max_second
return signal, sr, audio_length_second
def add_watermark(audio_path, watermark_text):
#t1 = time.time()
assert len(watermark_text) == 16
watermark_npy = np.array([int(i) for i in watermark_text])
signal, sr, audio_length_second = my_read_file(audio_path, max_second_encode)
watermarked_signal, _ = wavmark.encode_watermark(model, signal, watermark_npy, show_progress=False)
tmp_file_name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + "_" + watermark_text + ".wav"
tmp_file_path = '/tmp/' + tmp_file_name
soundfile.write(tmp_file_path, watermarked_signal, sr)
#encode_time_cost = time.time() - t1
return tmp_file_path
#def encode_water()
def decode_watermark(audio_path):
assert os.path.exists(audio_path)
signal, sr, audio_length_second = my_read_file(audio_path, max_second_decode)
payload_decoded, _ = wavmark.decode_watermark(model, signal, show_progress=False)
if payload_decoded is None:
return "No Watermark"
payload_decoded_str = "".join([str(i) for i in payload_decoded])
st.write("Result:", payload_decoded_str)
def create_default_value():
if "def_value" not in st.session_state:
def_val_npy = np.random.choice([0, 1], size=32 - len_start_bit)
def_val_str = "".join([str(i) for i in def_val_npy])
st.session_state.def_value = def_val_str
def main():
create_default_value()
markdown_text = """
# Audio WaterMarking
You can upload an audio file and encode a custom 16-bit watermark or perform decoding from a watermarked audio.
See [WaveMarktoolkit](https://github.com/wavmark/wavmark) for further details.
"""
st.markdown(markdown_text)
audio_file = st.file_uploader("Upload Audio", type=["wav", "mp3"], accept_multiple_files=False)
if audio_file:
tmp_input_audio_file = os.path.join("/tmp/", audio_file.name)
with open(tmp_input_audio_file, "wb") as f:
f.write(audio_file.getbuffer())
action = st.selectbox("Select Action", ["Add Watermark", "Decode Watermark"])
if action == "Add Watermark":
watermark_text = st.text_input("The watermark (0, 1 list of length-16):", value=st.session_state.def_value)
add_watermark_button = st.button("Add Watermark", key="add_watermark_btn")
if add_watermark_button:
if audio_file and watermark_text:
with st.spinner("Adding Watermark..."):
watermarked_audio = add_watermark(tmp_input_audio_file, watermark_text)
st.write("Watermarked Audio:")
print("watermarked_audio:", watermarked_audio)
st.audio(watermarked_audio, format="audio/wav")
elif action == "Decode Watermark":
if st.button("Decode"):
with st.spinner("Decoding..."):
decode_watermark(tmp_input_audio_file)
if __name__ == "__main__":
default_sr = 16000
max_second_encode = 60
max_second_decode = 30
len_start_bit = 16
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = wavmark.load_model().to(device)
main()
|