Reference / work

work

load("environments.star", "environment_from_json")

def next(fn, annotation="", inputs={}):
    """
    next specifies the next function to call within a function chain.

    Args:
        fn: The function to call
        annotation: An optional annotation for the call
        inputs: The inputs to the function, as a dictionary

    Returns:
        A dictionary representing the next work item
    """
    fd = render_function(fn)["function"]
    # TODO: This might not be necessary
    # add_func(fd)

    return {
        "next": {
            "fn": fd,
            "annotation": annotation,
            "inputs": _check_inputs(inputs),
        },
    }

def done(annotation="", outputs={}, tags=[]):
    """
    done marks the end of a function chain.

    Args:
        annotation: An optional annotation for the call
        outputs: The outputs of the chain, as a dictionary
        tags: Optional tags to apply to the release

    Returns:
        A dictionary representing the done work item
    """
    return {
        "done": {
            "outputs": outputs,
            "tags": tags,
        },
    }

def do_work(
    f,
    work_id,
    inputs=None,
):
    args = {}
    if work_id:
        args["work_id"] = json.decode(work_id)
    if inputs:
        args["inputs"] = vars_from_json(json.decode(inputs))

    ctx = struct(**args)
    return json.encode(f(ctx))

def vars_from_json(json):
    vars = {}
    for k, v in json.items():
        # TODO: handle known types using a "$type" field
        # Example below
        if type(v) == "dict" and "$type" in v:
            if v["$type"] == "ref":
                vars[k] = v["ref"]

        vars[k] = v

    return struct(**vars)

def _check_inputs(inputs):
    checked_inputs = {}
    # environment is allowed as an input name here, since
    # you may want to "forward" the value from the previous function
    for key, value in inputs.items():
        if type(value) == "dict" and ("ref" in value or "default" in value or "value" in value):
            checked_inputs[key] = value
        else:
            checked_inputs[key] = {
                "value": value,
            }
    return checked_inputs