Spaces:
Running
Running
import enum | |
from typing import Any | |
import streamlit as st | |
from core.state import Field | |
from core.state import RecordSet | |
from events.fields import ExtractType | |
from events.fields import FieldEvent | |
from events.fields import handle_field_change | |
from events.fields import TransformType | |
import mlcroissant as mlc | |
from utils import needed_field | |
_JSON_PATH_DOCUMENTATION = ( | |
"The JSON path if the data source is a JSON (see" | |
" [documentation](https://www.ietf.org/archive/id/draft-goessner-dispatch-jsonpath-00.html))." | |
) | |
_EXTRACT_DOCUMENTATION = ( | |
"The extraction method to get the value of the field (column in a CSV, etc)." | |
) | |
_COLUMN_NAME_DOCUMENTATION = "The name of the column if the data source is a CSV." | |
class SourceType: | |
"""The type of the source (distribution or field).""" | |
DISTRIBUTION = "distribution" | |
FIELD = "field" | |
EXTRACT_TYPES = [ | |
ExtractType.COLUMN, | |
ExtractType.JSON_PATH, | |
ExtractType.FILE_CONTENT, | |
ExtractType.FILE_NAME, | |
ExtractType.FILE_PATH, | |
ExtractType.FILE_FULLPATH, | |
ExtractType.FILE_LINES, | |
ExtractType.FILE_LINE_NUMBERS, | |
] | |
TRANSFORM_TYPES = [ | |
TransformType.FORMAT, | |
TransformType.JSON_PATH, | |
TransformType.REGEX, | |
TransformType.REPLACE, | |
TransformType.SEPARATOR, | |
] | |
def _get_extract(source: mlc.Source) -> str | None: | |
if source.extract.column: | |
return ExtractType.COLUMN | |
elif source.extract.file_property: | |
file_property = source.extract.file_property | |
if file_property == mlc.FileProperty.content: | |
return ExtractType.FILE_CONTENT | |
elif file_property == mlc.FileProperty.filename: | |
return ExtractType.FILE_NAME | |
elif file_property == mlc.FileProperty.filepath: | |
return ExtractType.FILE_PATH | |
elif file_property == mlc.FileProperty.fullpath: | |
return ExtractType.FILE_FULLPATH | |
elif file_property == mlc.FileProperty.lines: | |
return ExtractType.FILE_LINES | |
elif file_property == mlc.FileProperty.lineNumbers: | |
return ExtractType.FILE_LINE_NUMBERS | |
else: | |
return None | |
elif source.extract.json_path: | |
return ExtractType.JSON_PATH | |
return None | |
def _get_extract_index(source: mlc.Source) -> int | None: | |
extract = _get_extract(source) | |
if extract in EXTRACT_TYPES: | |
return EXTRACT_TYPES.index(extract) | |
return None | |
def _get_transforms(source: mlc.Source) -> list[str]: | |
transforms = source.transforms | |
return [_get_transform(transform) for transform in transforms] | |
def _get_transform(transform: mlc.Transform) -> str | None: | |
if transform.format: | |
return TransformType.FORMAT | |
elif transform.json_path: | |
return TransformType.JSON_PATH | |
elif transform.regex: | |
return TransformType.REGEX | |
elif transform.replace: | |
return TransformType.REPLACE | |
elif transform.separator: | |
return TransformType.SEPARATOR | |
return None | |
def _get_transforms_indices(source: mlc.Source) -> list[int]: | |
transforms = _get_transforms(source) | |
return [ | |
TRANSFORM_TYPES.index(transform) if transform in TRANSFORM_TYPES else None | |
for transform in transforms | |
] | |
def _handle_remove_reference(field): | |
"""Removes the reference from a field.""" | |
field.references = mlc.Source() | |
def render_source( | |
record_set: RecordSet, | |
field: Field, | |
possible_sources: list[str], | |
): | |
"""Renders the form for the source.""" | |
source = field.source | |
prefix = f"source-{record_set.name}-{field.name}" | |
col1, col2, col3 = st.columns([1, 1, 1]) | |
index = ( | |
possible_sources.index(source.uid) if source.uid in possible_sources else None | |
) | |
options = [s for s in possible_sources if not s.startswith(record_set.name)] | |
if index and (index < 0 or index >= len(options)): | |
index = None | |
key = f"{prefix}-source" | |
col1.selectbox( | |
needed_field("Data source"), | |
index=index, | |
options=options, | |
key=key, | |
help=( | |
"Data sources can be other resources (FileObject, FileSet) or other fields." | |
), | |
on_change=handle_field_change, | |
args=(FieldEvent.SOURCE, field, key), | |
) | |
if source.node_type == "distribution": | |
extract = col2.selectbox( | |
needed_field("Extract"), | |
index=_get_extract_index(source), | |
key=f"{prefix}-extract", | |
help=_EXTRACT_DOCUMENTATION, | |
options=EXTRACT_TYPES, | |
on_change=handle_field_change, | |
args=(FieldEvent.SOURCE_EXTRACT, field, key), | |
) | |
if extract == ExtractType.COLUMN: | |
key = f"{prefix}-columnname" | |
col3.text_input( | |
needed_field("Column name"), | |
value=source.extract.column, | |
key=key, | |
help=_COLUMN_NAME_DOCUMENTATION, | |
on_change=handle_field_change, | |
args=(FieldEvent.SOURCE_EXTRACT_COLUMN, field, key), | |
) | |
if extract == ExtractType.JSON_PATH: | |
key = f"{prefix}-jsonpath" | |
col3.text_input( | |
needed_field("JSON path"), | |
value=source.extract.json_path, | |
key=key, | |
help=_JSON_PATH_DOCUMENTATION, | |
on_change=handle_field_change, | |
args=(FieldEvent.SOURCE_EXTRACT_JSON_PATH, field, key), | |
) | |
# Transforms | |
indices = _get_transforms_indices(field.source) | |
if source.transforms: | |
for number, (index, transform) in enumerate(zip(indices, source.transforms)): | |
_, col2, col3, col4 = st.columns([4.5, 4, 4, 1]) | |
key = f"{prefix}-{number}-transform" | |
selected = col2.selectbox( | |
"Transform", | |
index=index, | |
key=key, | |
options=TRANSFORM_TYPES, | |
on_change=handle_field_change, | |
help="One or more transformations to apply after extracting the field.", | |
args=(FieldEvent.TRANSFORM, field, key), | |
kwargs={"number": number}, | |
) | |
if selected == TransformType.FORMAT: | |
key = f"{prefix}-{number}-transform-format" | |
col3.text_input( | |
needed_field("Format a date"), | |
value=transform.format, | |
key=key, | |
on_change=handle_field_change, | |
help=( | |
"For dates, use [`Python format" | |
" codes`](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes)." | |
), | |
args=(selected, field, key), | |
kwargs={"number": number}, | |
) | |
elif selected == TransformType.JSON_PATH: | |
key = f"{prefix}-{number}-jsonpath" | |
col3.text_input( | |
needed_field("JSON path"), | |
value=transform.json_path, | |
key=key, | |
on_change=handle_field_change, | |
help=_JSON_PATH_DOCUMENTATION, | |
args=(selected, field, key), | |
kwargs={"number": number}, | |
) | |
elif selected == TransformType.REGEX: | |
key = f"{prefix}-{number}-regex" | |
col3.text_input( | |
needed_field("Regular expression"), | |
value=transform.regex, | |
key=key, | |
on_change=handle_field_change, | |
help=( | |
"A regular expression following [`re` Python" | |
" convention](https://docs.python.org/3/library/re.html#regular-expression-syntax)" | |
" with one capturing group. The result of the operation will be" | |
" the last captured group." | |
), | |
args=(selected, field, key), | |
kwargs={"number": number}, | |
) | |
elif selected == TransformType.REPLACE: | |
key = f"{prefix}-{number}-replace" | |
col3.text_input( | |
needed_field("Replace pattern"), | |
value=transform.replace, | |
key=key, | |
on_change=handle_field_change, | |
help=( | |
"A replace pattern separated by a `/`, i.e." | |
" `string_to_replace/string_to_substitute` in order to replace" | |
" `string_to_replace` by `string_to_substitute`." | |
), | |
args=(selected, field, key), | |
kwargs={"number": number}, | |
) | |
elif selected == TransformType.SEPARATOR: | |
key = f"{prefix}-{number}-separator" | |
col3.text_input( | |
needed_field("Separator"), | |
value=transform.separator, | |
key=key, | |
on_change=handle_field_change, | |
help="A separator to split strings on, e.g. `|` to split `a|b|c`.", | |
args=(selected, field, key), | |
kwargs={"number": number}, | |
) | |
def _handle_remove_transform(field, number): | |
del field.source.transforms[number] | |
col4.button( | |
"✖️", | |
key=f"{prefix}-{number}-remove-transform", | |
help="Remove the transformation.", | |
on_click=_handle_remove_transform, | |
args=(field, number), | |
) | |
def _handle_add_transform(field): | |
if not field.source: | |
field.source = mlc.Source(transforms=[]) | |
field.source.transforms.append(mlc.Transform()) | |
col1, _, _ = st.columns([1, 1, 1]) | |
col1.button( | |
"Add transform on data", | |
key=f"{prefix}-close-fields", | |
help="Add a transformation.", | |
on_click=_handle_add_transform, | |
args=(field,), | |
) | |
def render_references( | |
record_set: RecordSet, | |
field: Field, | |
possible_sources: list[str], | |
): | |
"""Renders the form for references.""" | |
key = f"references-{record_set.name}-{field.name}" | |
button_key = f"{key}-add-reference" | |
has_clicked_button = st.session_state.get(button_key) | |
references = field.references | |
if references or has_clicked_button: | |
col1, col2, col3, col4 = st.columns([4.5, 4, 4, 1]) | |
index = ( | |
possible_sources.index(references.uid) | |
if references.uid in possible_sources | |
else None | |
) | |
options = [s for s in possible_sources if not s.startswith(record_set.name)] | |
if index and (index < 0 or index >= len(options)): | |
index = None | |
key = f"{key}-reference" | |
col1.selectbox( | |
"Reference", | |
index=index, | |
options=options, | |
key=key, | |
on_change=handle_field_change, | |
args=(FieldEvent.REFERENCE, field, key), | |
) | |
if references.node_type == "distribution": | |
key = f"{key}-extract-references" | |
extract = col2.selectbox( | |
needed_field("Extract the reference"), | |
index=_get_extract_index(references), | |
key=key, | |
options=EXTRACT_TYPES, | |
help=_EXTRACT_DOCUMENTATION, | |
on_change=handle_field_change, | |
args=(FieldEvent.REFERENCE_EXTRACT, field, key), | |
) | |
if extract == ExtractType.COLUMN: | |
key = f"{key}-columnname" | |
col3.text_input( | |
needed_field("Column name"), | |
value=references.extract.column, | |
key=key, | |
help=_COLUMN_NAME_DOCUMENTATION, | |
on_change=handle_field_change, | |
args=(FieldEvent.REFERENCE_EXTRACT_COLUMN, field, key), | |
) | |
if extract == ExtractType.JSON_PATH: | |
key = f"{key}-jsonpath" | |
col3.text_input( | |
needed_field("JSON path"), | |
value=references.extract.json_path, | |
key=key, | |
help=_JSON_PATH_DOCUMENTATION, | |
on_change=handle_field_change, | |
args=(FieldEvent.REFERENCE_EXTRACT_JSON_PATH, field, key), | |
) | |
col4.button( | |
"✖️", | |
key=f"{key}-remove-reference", | |
help="Remove the join.", | |
on_click=_handle_remove_reference, | |
args=(field,), | |
) | |
elif not has_clicked_button: | |
st.button( | |
"Add a join with another column/field", | |
key=button_key, | |
) | |