Source code for meshed.tools

"""Tools to work with meshed"""

from collections import namedtuple
from contextlib import contextmanager
from functools import cached_property, partial
import multiprocessing
import os
import time
from typing import Callable, List
from urllib.parse import urljoin

import i2

from meshed.dag import DAG


HOST = os.environ.get('HOST', '0.0.0.0')
PORT = int(os.environ.get('PORT', 3030))
API_URL = os.environ.get('API_URL', f'http://localhost:{PORT}')
SERVER = os.environ.get('SERVER', 'wsgiref')
OPENAPI_URL = urljoin(API_URL, 'openapi')


def find_funcs(dag, func_outs):
    return list(dag.find_funcs(lambda x: x.out in func_outs))


[docs] def mk_dag_with_ws_funcs(dag: DAG, ws_funcs: dict) -> DAG: """Creates a new DAG with the web service functions. :param dag: DAG to be hybridized :type dag: DAG :param ws_funcs: mapping of web service functions :type ws_funcs: dict :return: new DAG with the web service functions :rtype: DAG """ return dag.ch_funcs(**ws_funcs)
[docs] def launch_funcs_webservice(funcs: List[Callable]): """Launches a web service application with the specified functions. :param funcs: functions to be hosted by the web service :type funcs: List[Callable] """ from extrude import mk_api, run_api ws_app = mk_api(funcs, openapi=dict(base_url=API_URL)) run_api(ws_app, host=HOST, port=PORT, server=SERVER)
[docs] @contextmanager def launch_webservice(funcs_to_cloudify, wait_after_start_seconds=10): """Context manager to launch a web service application in a separate process.""" ws = multiprocessing.Process( target=launch_funcs_webservice, args=(funcs_to_cloudify,) ) ws.start() # TODO: I prefer using a timeout instead of a fixed wait time # TODO: Use strand tool for this: https://github.com/i2mint/strand/blob/7443631e9d2486358f0a34ed182e85b6ded5e50c/strand/taskrunning/utils.py#L54 time.sleep(wait_after_start_seconds) yield ws ws.terminate()
class CloudFunctions: def __init__(self, funcs: List[Callable], openapi_url=OPENAPI_URL, logger=print): """Creates a Python dictionary-like object that maps the web service functions to Python functions. :param funcs: list of functions hosted by the web service :type funcs: List[Callable] :param openapi_url: url to get openapi spec, defaults to "http://localhost:{PORT}/openapi" :type openapi_url: str, optional :param logger: logger function, defaults to print :type logger: Callable, optional """ self.funcs = funcs self.openapi_url = openapi_url self.logger = logger if callable(logger) else lambda x: None @cached_property def http_client(self): from http2py import HttpClient try: return HttpClient(url=self.openapi_url) except Exception: self.logger( f'Could not connect to {self.openapi_url}. Waiting 10 seconds and trying again.' ) time.sleep(10) return HttpClient(url=self.openapi_url) @cached_property def func_names(self): return frozenset(f.__name__ for f in self.funcs) def __getitem__(self, key): """Returns a Python function that calls the web service function. HttpClient is queried at the execution of the function. """ @i2.Sig(next(f for f in self.funcs if key == f.__name__)) def ws_func(*a, **kw): self.logger(f'Getting web service for: {key}') if (_wsf := getattr(self.http_client, key, None)) is not None: self.logger(f'Found web service for: {key}') return _wsf(*a, **kw) raise KeyError(key) return ws_func def __contains__(self, key): return key in self.func_names def __len__(self): return len(self.func_names) def keys(self): return self.func_names def values(self): return (self[k] for k in self.keys()) def items(self): return ((k, self[k]) for k in self.keys())
[docs] def mk_hybrid_dag(dag: DAG, func_ids_to_cloudify: list): """Creates a hybrid DAG that uses the web service for the specified functions. :param dag: dag to be hybridized :type dag: DAG :param func_ids_to_cloudify: list of function ids to be cloudified :type func_ids_to_cloudify: list :return: namedtuple with funcs_to_cloudify, ws_dag and ws_funcs :rtype: namedtuple """ funcs_to_cloudify = find_funcs(dag, func_ids_to_cloudify) ws_funcs = CloudFunctions(funcs_to_cloudify) ws_dag = mk_dag_with_ws_funcs(dag, ws_funcs) HybridDAG = namedtuple('HybridDAG', ['funcs_to_cloudify', 'ws_dag', 'ws_funcs']) return HybridDAG(funcs_to_cloudify, ws_dag, ws_funcs)