|
import json |
|
import os |
|
import base64 |
|
import streamlit as st |
|
|
|
FIELDS = [ |
|
"CodeValue", |
|
"CodeType", |
|
"Context", |
|
"Question", |
|
"AnswerText", |
|
"UpVoteCount", |
|
"DownVoteCount", |
|
"VoteComment", |
|
] |
|
|
|
IO_PATTERN = "*.jsonl" |
|
|
|
|
|
def read_jsonl_file(file_path): |
|
if not os.path.exists(file_path): |
|
return [] |
|
with open(file_path, "r") as f: |
|
lines = f.readlines() |
|
records = [json.loads(line) for line in lines] |
|
return records |
|
|
|
|
|
def write_jsonl_file(file_path, records): |
|
with open(file_path, "w") as f: |
|
for record in records: |
|
f.write(json.dumps(record) + "\n") |
|
|
|
|
|
def list_files(): |
|
return [f for f in os.listdir() if f.endswith(".jsonl")] |
|
|
|
|
|
def download_link(file_path): |
|
with open(file_path, "rb") as f: |
|
contents = f.read() |
|
b64 = base64.b64encode(contents).decode() |
|
href = f'<a href="data:application/octet-stream;base64,{b64}" download="{file_path}">Download</a>' |
|
return href |
|
|
|
|
|
def main(): |
|
jsonl_files = list_files() |
|
|
|
if not jsonl_files: |
|
st.warning("No JSONL files found. Creating new file.") |
|
jsonl_files.append("data.jsonl") |
|
write_jsonl_file("data.jsonl", []) |
|
|
|
selected_file = st.sidebar.text_input("Enter file name", value=jsonl_files[0]) |
|
if selected_file != jsonl_files[0]: |
|
os.rename(jsonl_files[0], selected_file) |
|
jsonl_files[0] = selected_file |
|
|
|
st.sidebar.write("JSONL files:") |
|
selected_file_index = st.sidebar.selectbox("", range(len(jsonl_files))) |
|
for i, file_name in enumerate(jsonl_files): |
|
if i == selected_file_index: |
|
selected_file = file_name |
|
st.sidebar.write(f"> {file_name}") |
|
else: |
|
st.sidebar.write(file_name) |
|
|
|
st.sidebar.markdown(download_link(selected_file), unsafe_allow_html=True) |
|
|
|
records = read_jsonl_file(selected_file) |
|
|
|
for field in FIELDS: |
|
value = st.text_input(field, key=field) |
|
st.write(f"{field}: {value}") |
|
|
|
if st.button("Add Record"): |
|
record = {field: st.session_state[field] for field in FIELDS} |
|
records.append(record) |
|
write_jsonl_file(selected_file, records) |
|
st.success("Record added!") |
|
|
|
st.write(f"Current contents of {selected_file}:") |
|
for record in records: |
|
st.write(record) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|