|
from io import StringIO |
|
import streamlit as st |
|
|
|
from smem_token_parser import SMEM_Parser, read_tokens_from_lines |
|
from smem_obj import SMEM_Obj, ObjType |
|
|
|
MIN_COLS = 4 |
|
|
|
|
|
st.set_page_config(page_title="Soar Agent Memory Inspector", layout="wide") |
|
st.title("Logic Inspector") |
|
|
|
|
|
def get_smem_root_from_file(smem_file): |
|
|
|
tokens = read_tokens_from_lines(smem_file) |
|
if tokens == None: |
|
st.error("Error reading file: '"+str(smem_file)+"'") |
|
return None |
|
parser = SMEM_Parser() |
|
parser.parse_file(tokens) |
|
return parser.get_context_root() |
|
|
|
def reset_cols(): |
|
st.session_state.col_obj_list = [None if i > 0 else x for i,x in enumerate(st.session_state.col_obj_list)] |
|
|
|
|
|
|
|
if "col_obj_list" not in st.session_state: |
|
st.session_state["col_obj_list"] = None |
|
|
|
file_upload_expander = st.expander(label="Select a file to inspect", expanded=(st.session_state.col_obj_list == None)) |
|
file = file_upload_expander.file_uploader(" ") |
|
|
|
if file is not None: |
|
if st.session_state.col_obj_list is None: |
|
root = get_smem_root_from_file(StringIO(file.getvalue().decode("utf-8"))) |
|
if root: |
|
st.session_state["col_obj_list"] = [None]*MIN_COLS |
|
st.session_state.col_obj_list[0] = root |
|
st.experimental_rerun() |
|
else: |
|
st.session_state["col_obj_list"] = None |
|
|
|
if st.session_state.col_obj_list is None: |
|
st.stop() |
|
|
|
|
|
|
|
@st.cache(show_spinner=False, hash_funcs={SMEM_Obj: id}) |
|
def get_filter_features_dict(root_obj): |
|
return root_obj.get_referenced_features() |
|
|
|
|
|
features_dict = get_filter_features_dict(st.session_state.col_obj_list[0]) |
|
filters_expander = st.expander(label="Filters") |
|
filters_cols = filters_expander.columns(min(len(features_dict), 5)) |
|
filters_dict = {} |
|
for i,key in enumerate(features_dict): |
|
col = filters_cols[i % len(filters_cols)] |
|
filters_dict[key] = col.multiselect(label=key, options=["(none)"]+sorted(features_dict[key]), on_change=reset_cols) |
|
|
|
|
|
|
|
|
|
def add_col(index, obj): |
|
st.session_state.col_obj_list = st.session_state.col_obj_list[:index+1] |
|
st.session_state.col_obj_list.append(obj) |
|
|
|
st.experimental_rerun() |
|
|
|
def get_header_str(obj_type): |
|
if obj_type == ObjType.CONTEXT: |
|
return "START" |
|
elif obj_type == ObjType.COND_CONJ: |
|
return "CONDITION" |
|
elif obj_type == ObjType.OP: |
|
return "CHOICE" |
|
elif obj_type == ObjType.ACT_GROUP: |
|
return "RESULT" |
|
else: |
|
return " " |
|
|
|
def get_tested_wmes_from_obj_str(obj_str): |
|
attr_list = [] |
|
val_list = [] |
|
clauses = str(obj_str).split(" AND ") |
|
for clause in clauses: |
|
attr,val = clause.replace("(","").replace(")","").split(" is ", maxsplit=1) |
|
attr_list += [attr] |
|
val_list += [val] |
|
return attr_list, val_list |
|
|
|
st.subheader("Click the buttons below to select conditions and to inspect resulting actions.") |
|
|
|
cols = st.columns(max(MIN_COLS,len(st.session_state.col_obj_list))) |
|
|
|
for i,col in enumerate(cols): |
|
try: |
|
if st.session_state.col_obj_list[i] == None: |
|
break |
|
except Exception as e: |
|
|
|
break |
|
|
|
obj = st.session_state.col_obj_list[i] |
|
obj_str,obj_label,obj_desc = obj.to_string() |
|
sub_objs = list(obj.get_child_objects(compact=True)) |
|
sub_objs.sort(key=lambda x:x.to_string()[1]) |
|
|
|
if len(sub_objs) > 0: |
|
col.markdown("**"+get_header_str(sub_objs[0].obj_type)+"**",) |
|
col.markdown("---") |
|
|
|
col.text(obj_str) |
|
if obj_desc != None: |
|
col.markdown("*"+obj_desc+"*") |
|
|
|
|
|
for j,sub in enumerate(sub_objs): |
|
sub_str,sub_label,sub_desc = sub.to_string() |
|
if sub_desc != None: |
|
button_text = sub_desc |
|
else: |
|
button_text = sub_label |
|
|
|
if sub.obj_type == ObjType.COND_PRIM or sub.obj_type == ObjType.COND_CONJ: |
|
|
|
keep = True |
|
attr_list, val_list = get_tested_wmes_from_obj_str(sub_desc) |
|
|
|
for attr in filters_dict: |
|
if attr not in attr_list: |
|
|
|
continue |
|
if val_list[attr_list.index(attr)] not in filters_dict[attr] and len(filters_dict[attr]) > 0: |
|
|
|
keep = False |
|
break |
|
if not keep: |
|
continue |
|
|
|
if sub.obj_type == ObjType.OP: |
|
subsub = list(sub.get_child_objects(compact=True))[0] |
|
_,group_string,_ = subsub.to_string() |
|
group_string = "\n * "+str(group_string).replace("AND", "\n * ") |
|
col.markdown(group_string) |
|
if col.button(button_text, key="button"+str(i)+"-"+str(j)): |
|
add_col(i,sub) |
|
elif sub.obj_type == ObjType.ACT_GROUP: |
|
col.markdown("*Output Details:* ") |
|
col.markdown("\n * "+str(sub_str).replace(") AND", ")\n * ")) |
|
elif col.button(button_text, key="button"+str(i)+"-"+str(j)): |
|
add_col(i,sub) |
|
|