Reference / work

work

load("environments.star", "environment_from_json")
load("dowork.star", "check_params")

def next(fn, annotation="", inputs={}):
    """
    next specifies the next function to call within a function chain.

    Args:
        fn: The function to call
        annotation: An optional annotation for the call
        inputs: The inputs to the function, as a dictionary

    Returns:
        A dictionary representing the next work item
    """
    fd = render_function(fn)["function"]

    return {
        "next": {
            "fn": fd,
            "annotation": annotation,
            "inputs": _check_inputs(fn, inputs),
        },
    }

def done(annotation="", outputs={}, tags=[], watch=[]):
    """
    done marks the end of a function chain.

    Args:
        annotation: An optional annotation for the call
        outputs: The outputs of the chain, as a dictionary
        tags: Optional tags to apply to the release

    Returns:
        A dictionary representing the done work item
    """
    return {
        "done": {
            "outputs": outputs,
            "tags": tags,
            "watch": watch,
        },
    }

def _check_inputs(fn, inputs):
    checked_inputs = {}
    # environment is allowed as an input name here, since
    # you may want to "forward" the value from the previous function
    for key, value in inputs.items():
        if type(value) == "dict" and ("ref" in value or "default" in value or "value" in value):
            checked_inputs[key] = value
        else:
            checked_inputs[key] = {
                "value": value,
            }

    fParams = json.decode(backend.functions.get_args(fn))
    check_params(fn, fParams["args"], fParams["kwargs"], checked_inputs.keys())

    return checked_inputs