diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 1f81a20409..f074c0ed2c 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -52,7 +52,7 @@ jobs: fail-fast: false max-parallel: 40 matrix: - plugin: [., examples/shouldi, model/tensorflow, model/tensorflow_hub, model/transformers, model/scratch, model/scikit, model/vowpalWabbit, operations/binsec, source/mysql, feature/git, feature/auth, service/http, configloader/yaml, configloader/png] + plugin: [., examples/shouldi, model/tensorflow, model/tensorflow_hub, model/transformers, model/scratch, model/scikit, model/vowpalWabbit, operations/binsec, operations/deploy, source/mysql, feature/git, feature/auth, service/http, configloader/yaml, configloader/png] python-version: [3.7, 3.8] steps: @@ -124,6 +124,7 @@ jobs: feature/git=${{ secrets.PYPI_FEATURE_GIT }} feature/auth=${{ secrets.PYPI_FEATURE_AUTH }} operations/binsec=${{ secrets.PYPI_OPERATIONS_BINSEC }} + operations/deploy=${{ secrets.PYPI_OPERATIONS_DEPLOY }} service/http=${{ secrets.PYPI_SERVICE_HTTP }} configloader/yaml=${{ secrets.PYPI_CONFIG_YAML }} configloader/png=${{ secrets.PYPI_CONFIG_PNG }} diff --git a/CHANGELOG.md b/CHANGELOG.md index b814232686..4fd56bda79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Ability to export any object with `dffml service dev export` - Complete example for dataflow run cli command - Tests for default configs instantiation. +- Example ffmpeg operation. +- Operations to deploy docker container on receving github webhook. +- New use case `Redeploying dataflow on webhook` in docs. - Documentation for creating Source for new File types taking `.ini` as an example. +- New input modes, output modes for HTTP API dataflow registration. ### Changed - `Edit on Github` button now hidden for plugins. - Doctests now run via unittests diff --git a/dffml/__init__.py b/dffml/__init__.py index e6632c220c..ab8edb9b71 100644 --- a/dffml/__init__.py +++ b/dffml/__init__.py @@ -101,11 +101,7 @@ class DuplicateName(Exception): f"new: {module}) " ) # Add to dict to ensure no duplicates - cls_func_all[obj.__qualname__] = ( - import_name_no_package, - module, - obj, - ) + cls_func_all[obj.__qualname__] = (import_name_no_package, module, obj) for name, (_import_name, _module, obj) in cls_func_all.items(): setattr(sys.modules[__name__], name, obj) diff --git a/dffml/model/slr.py b/dffml/model/slr.py index 0aab96b691..840e115265 100644 --- a/dffml/model/slr.py +++ b/dffml/model/slr.py @@ -4,10 +4,7 @@ from ..base import config, field from ..util.entrypoint import entrypoint -from .model import ( - SimpleModel, - ModelNotTrained, -) +from .model import SimpleModel, ModelNotTrained from .accuracy import Accuracy from ..feature.feature import Feature, Features from ..source.source import Sources diff --git a/dffml/skel/operations/Dockerfile b/dffml/skel/operations/Dockerfile index 9a44ecbf8f..43cd6e20a6 100644 --- a/dffml/skel/operations/Dockerfile +++ b/dffml/skel/operations/Dockerfile @@ -1,6 +1,7 @@ # Usage # docker build -t REPLACE_ORG_NAME/REPLACE_IMPORT_PACKAGE_NAME . -# docker run --rm -ti -p 80:8080 intelotc/operations -insecure -log debug +# docker run --rm -ti -p 80:8080 REPLACE_ORG_NAME/REPLACE_IMPORT_PACKAGE_NAME -insecure -log debug +# # curl -v http://127.0.0.1:80/list/sources FROM ubuntu:20.04 diff --git a/dffml/source/ini.py b/dffml/source/ini.py index fecb0356c3..974eda0161 100644 --- a/dffml/source/ini.py +++ b/dffml/source/ini.py @@ -41,7 +41,7 @@ async def load_fd(self, fileobj): temp_dict[k] = parser_helper(v) # Each section used as a record self.mem[str(section)] = Record( - str(section), data={"features": temp_dict}, + str(section), data={"features": temp_dict} ) self.logger.debug("%r loaded %d sections", self, len(self.mem)) diff --git a/dffml/util/data.py b/dffml/util/data.py index c285fcf3a9..59ea05005b 100644 --- a/dffml/util/data.py +++ b/dffml/util/data.py @@ -138,9 +138,7 @@ def type_lookup(typename): def export_value(obj, key, value): # export and _asdict are not classmethods if hasattr(value, "ENTRY_POINT_ORIG_LABEL") and hasattr(value, "config"): - obj[key] = { - "plugin": value.ENTRY_POINT_ORIG_LABEL, - } + obj[key] = {"plugin": value.ENTRY_POINT_ORIG_LABEL} export_value(obj[key], "config", value.config) elif inspect.isclass(value): obj[key] = value.__qualname__ diff --git a/docs/usage/index.rst b/docs/usage/index.rst index fcdca84e8b..019986bec9 100644 --- a/docs/usage/index.rst +++ b/docs/usage/index.rst @@ -11,3 +11,4 @@ The following are some example use cases of DFFML. dataflows mnist io + webhook/index diff --git a/docs/usage/webhook/deploy.rst b/docs/usage/webhook/deploy.rst new file mode 100644 index 0000000000..cc4b4c0035 --- /dev/null +++ b/docs/usage/webhook/deploy.rst @@ -0,0 +1,160 @@ +.. _usage_ffmpeg_deploy: + +Deploying with the HTTP Service +=============================== + +In this tutorial we will deploy a dataflow(ffmpeg dataflow) which converts a video to gif over an HTTP service. We'll +also see how to deploy the same in a docker container. Finally in :ref:`usage_ffmpeg_deploy_serve` +we'll setup another HTTP service which waits on GitHub webhooks to rebuilt and deploy the ffmpeg dataflow. + +.. note:: + + All the code for this example is located under the + `examples/ffmpeg `_ + directory of the DFFML source code. + +We'll be using additional plugins from dffml, ``dffml-yaml-config`` and ``dffml-http-service``. + +.. code-block:: console + + $ pip install dffml-yaml-config dffml-http-service + +Create the Package +------------------ + +To create a new operation we first create a new Python package. DFFML has a script to do it for you. + +.. code-block:: console + + $ dffml service dev create operations ffmpeg + $ cd ffmpeg + +Write operations and definitions to convert videos files to gif by calling +``ffmpeg`` (Make sure you `download and install `_ it). +The operation accepts bytes (of the video), converts it to gif and outputs it as bytes. +We will start writing our operation in ``./ffmpeg/operations.py`` + +**ffmpeg/operations.py** + +.. literalinclude:: /../examples/ffmpeg/ffmpeg/operations.py + +Add the operation to ``ffmpeg/setup.py`` + +.. code-block:: python + + common.KWARGS["entry_points"] = { + "dffml.operation": [ + f"convert_to_gif = {common.IMPORT_NAME}.operations:convert_to_gif" + ] + } + +Install the package + +.. code-block:: console + + $ pip install -e . + +Dataflow and Config files +------------------------- + +**ffmpeg/dataflow.py** + +.. literalinclude:: /../examples/ffmpeg/ffmpeg/dataflow.py + +.. code-block:: console + + $ mkdir -p deploy/mc/http deploy/df + $ dffml service dev export -config yaml ffmpeg.dataflow:DATAFLOW > deploy/df/ffmpeg.yaml + +Create the config file for the HTTP service +in ``deploy/mc/http/ffmpeg.yaml`` + +.. code-block:: console + + $ cat > ./deploy/mc/http/ffmpeg.yaml <`__ . + +.. _usage_ffmpeg_deploy_serve: + +Serving the DataFlow +-------------------- + +Serving the dataflow on port 8080 + +.. code-block:: console + + $ dffml service http server -insecure -mc-config deploy -port 8080 + +.. warning:: + + The ``-insecure`` flag is only being used here to speed up this + tutorial. See documentation on HTTP API + :doc:`/plugins/service/http/security` for more information. + +Now from another terminal, we can send post requests to the dataflow running at this port. + +.. code-block:: console + + $ curl -v --request POST --data-binary @input.mp4 http://localhost:8080/ffmpeg -o output.gif + +You should replace ``input.mp4`` with path to your video file and ``output.gif`` to where you want the converted gif +to be output to. An example video is available `here `_ . + +Deploying via container +======================= + +A ``Dockerfile`` is already generated in ffmpeg folder. We need to modify it to include ``ffmpeg``. + +**Dockerfile** + +.. literalinclude:: /../examples/ffmpeg/Dockerfile + +.. note:: + + The run command in the comment section of the Dockerfile will be used to execute + the container after receving webhooks, so make sure you change it to your usecase. + +For this tutorial we will change it to + +.. code-block:: Dockerfile + + # docker run --rm -ti -p 8080:8080 $USER/ffmpeg -mc-config deploy -insecure -log debug + +.. note:: + + The image built after pulling the contaier will be taged ``USERNAME/REPONAME``, where USERNAME and REPONAME + are gathered from the github html url, received in the webhook. + +We can run the container and sent a post request to verify that the container is working. + +.. code-block:: console + + $ docker build -t $USER/ffmpeg . + $ docker run --rm -ti -p 8080:8080 $USER/ffmpeg -mc-config deploy -insecure -log debug + +Now in another terminal + +.. code-block:: console + + $ curl -v --request POST --data-binary @input.mp4 http://localhost:8080/ffmpeg -o output.gif + +Now in :ref:`usage_webhook` we'll setup this container to be automatically redeployed +whenever we push to the Git repo containing this code. \ No newline at end of file diff --git a/docs/usage/webhook/images/github_settings.png b/docs/usage/webhook/images/github_settings.png new file mode 100644 index 0000000000..be38652e74 Binary files /dev/null and b/docs/usage/webhook/images/github_settings.png differ diff --git a/docs/usage/webhook/images/localhost_run.png b/docs/usage/webhook/images/localhost_run.png new file mode 100644 index 0000000000..06ab9fa767 Binary files /dev/null and b/docs/usage/webhook/images/localhost_run.png differ diff --git a/docs/usage/webhook/images/ngrok_out.png b/docs/usage/webhook/images/ngrok_out.png new file mode 100644 index 0000000000..81f8c20882 Binary files /dev/null and b/docs/usage/webhook/images/ngrok_out.png differ diff --git a/docs/usage/webhook/index.rst b/docs/usage/webhook/index.rst new file mode 100644 index 0000000000..e61aa311a3 --- /dev/null +++ b/docs/usage/webhook/index.rst @@ -0,0 +1,12 @@ +Continuous Deployment of DataFlows +================================== + +This example shows how to deploy your dataflow and to set it up so that it gets redployed +on receiving webhooks from GitHub. + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + deploy + webhook diff --git a/docs/usage/webhook/webhook.rst b/docs/usage/webhook/webhook.rst new file mode 100644 index 0000000000..c3aa0fdb11 --- /dev/null +++ b/docs/usage/webhook/webhook.rst @@ -0,0 +1,114 @@ +.. _usage_webhook: + +Redeploying on receving GitHub webhook +====================================== + +We'll move ``ffmpeg`` to a GitHub repo, and set up a webhook DataFlow such that whenever +we push to the default branch, the new version is pulled and its docker container is built and run. + +Webhook Dataflow +---------------- + +We'll be using operations from ``dffml-operations-deploy``, ``dffml-feature-git``, ``dffml-config-yaml``. + +.. code-block:: console + + $ pip install dffml-operations-deploy dffml-feature-git dffml-config-yaml + +Setup a http server in ``ffmpeg/deploy/webhook``, to receive webhook and redploy ffmpeg + +.. code-block:: console + + $ mkdir -p deploy/webhook/df deploy/webhook/mc/http + $ cat > /tmp/operations < deploy/webhook/df/webhook.yaml + +Config + +**deploy/webhook/mc/http/webhook.yaml** + +.. code-block:: console + + $ cat > ./deploy/webhook/mc/http/webhook.yaml <`_ + + .. code-block:: console + + $ ssh -R 80:localhost:8081 $RANDOM@ssh.localhost.run + + .. image:: ./images/localhost_run.png + + Using ngrok + + .. code-block:: console + + $ ~/ngrok http 8081 + + .. image:: ./images/ngrok_out.png + +Copy paste the output url to ``Payload URL`` in webhook settings of ffmpeg repo. + +.. image:: ./images/github_settings.png + +Now whenever there's a push to the default branch of the repo, the ffmpeg container +which is running gets redeployed from the fresh pull. To check this we will modify the +end time of the conversion from 10 to 12 in ``ffmpeg/operations.py`` by changing + +.. code-block:: python + + proc = await asyncio.create_subprocess_exec( + "ffmpeg", + "-ss", + "0.3", + "-t", + "10", + .. + .. + ) + +to + +.. code-block:: python + + proc = await asyncio.create_subprocess_exec( + "ffmpeg", + "-ss", + "0.3", + "-t", + "12", + .. + .. + ) + +on pushing the changes to our repo, the container will be redeployed. To verify this run +``docker ps`` and check the up time of the container. diff --git a/examples/ffmpeg/.coveragerc b/examples/ffmpeg/.coveragerc new file mode 100644 index 0000000000..5b3709d1ce --- /dev/null +++ b/examples/ffmpeg/.coveragerc @@ -0,0 +1,13 @@ +[run] +source = + ffmpeg + tests +branch = True + +[report] +exclude_lines = + no cov + no qa + noqa + pragma: no cover + if __name__ == .__main__.: diff --git a/examples/ffmpeg/.gitignore b/examples/ffmpeg/.gitignore new file mode 100644 index 0000000000..070ee81c83 --- /dev/null +++ b/examples/ffmpeg/.gitignore @@ -0,0 +1,20 @@ +*.log +*.pyc +.cache/ +.coverage +.idea/ +.vscode/ +*.egg-info/ +build/ +dist/ +docs/build/ +venv/ +wheelhouse/ +*.egss +.mypy_cache/ +*.swp +.venv/ +.eggs/ +*.modeldir +*.db +htmlcov/ diff --git a/examples/ffmpeg/Dockerfile b/examples/ffmpeg/Dockerfile new file mode 100644 index 0000000000..c6a9587875 --- /dev/null +++ b/examples/ffmpeg/Dockerfile @@ -0,0 +1,31 @@ +# Usage +# docker build -t $USER/ffmpeg . +# docker run --rm -ti -p 8080:8080 $USER/ffmpeg -mc-config deploy -insecure -log debug +# +# curl -v --request POST --data-binary @input.mp4 http://localhost:8080/ffmpeg -o output.gif +FROM ubuntu:20.04 + +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update && \ + apt-get install -y \ + gcc \ + python3-dev \ + python3-pip \ + python3 \ + ca-certificates \ + ffmpeg && \ + python3 -m pip install -U pip && \ + python3 -m pip install dffml-service-http && \ + python3 -m pip install dffml-config-yaml && \ + apt-get purge -y \ + gcc \ + python3-dev && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /usr/src/app +COPY . /usr/src/app + +RUN python3 -m pip install -e . + +ENTRYPOINT ["python3", "-m", "dffml", "service", "http", "server", "-addr", "0.0.0.0"] +CMD ["-mc-config", "deploy"] diff --git a/examples/ffmpeg/LICENSE b/examples/ffmpeg/LICENSE new file mode 100644 index 0000000000..96488ee14b --- /dev/null +++ b/examples/ffmpeg/LICENSE @@ -0,0 +1,21 @@ +Copyright (c) 2019 aghinsa + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/examples/ffmpeg/MANIFEST.in b/examples/ffmpeg/MANIFEST.in new file mode 100644 index 0000000000..19f3196490 --- /dev/null +++ b/examples/ffmpeg/MANIFEST.in @@ -0,0 +1,3 @@ +include README.md +include LICENSE +include setup_common.py diff --git a/examples/ffmpeg/README.md b/examples/ffmpeg/README.md new file mode 100644 index 0000000000..54c758323c --- /dev/null +++ b/examples/ffmpeg/README.md @@ -0,0 +1,7 @@ +# DFFML ffmpeg Operations + +* Operation to convert video to gif. + +## License + +DFFML ffmpeg is distributed under the [MIT License](LICENSE). diff --git a/examples/ffmpeg/deploy/df/ffmpeg.yaml b/examples/ffmpeg/deploy/df/ffmpeg.yaml new file mode 100644 index 0000000000..16bde9582c --- /dev/null +++ b/examples/ffmpeg/deploy/df/ffmpeg.yaml @@ -0,0 +1,50 @@ +definitions: + input_file: + name: input_file + primitive: bytes + output_file: + name: output_file + primitive: bytes + Resolution: + name: Resolution + primitive: int + get_single_output: + name: get_single_output + primitive: map + get_single_spec: + name: get_single_spec + primitive: array +flow: + convert_to_gif: + inputs: + input_file: + - seed + resolution: + - seed + get_single: + inputs: + spec: + - seed +linked: true +operations: + convert_to_gif: + inputs: + input_file: input_file + resolution: Resolution + name: convert_to_gif + outputs: + output_file: output_file + stage: processing + get_single: + inputs: + spec: get_single_spec + name: get_single + outputs: + output: get_single_output + stage: output +seed: +- definition: get_single_spec + value: + - output_file +- definition: Resolution + value: 480 diff --git a/examples/ffmpeg/deploy/mc/http/ffmpeg.yaml b/examples/ffmpeg/deploy/mc/http/ffmpeg.yaml new file mode 100644 index 0000000000..3c25e668b3 --- /dev/null +++ b/examples/ffmpeg/deploy/mc/http/ffmpeg.yaml @@ -0,0 +1,3 @@ +path: /ffmpeg +input_mode: bytes:input_file +output_mode: bytes:image/gif:post_input.output_file \ No newline at end of file diff --git a/examples/ffmpeg/deploy/webhook/df/webhook.yaml b/examples/ffmpeg/deploy/webhook/df/webhook.yaml new file mode 100644 index 0000000000..3eeff2bbdd --- /dev/null +++ b/examples/ffmpeg/deploy/webhook/df/webhook.yaml @@ -0,0 +1,161 @@ +definitions: + URL: + name: URL + primitive: string + docker_commands: + name: docker_commands + primitive: Dict[str,Any] + docker_image_tag: + name: docker_image_tag + primitive: str + docker_running_containers: + name: docker_running_containers + primitive: List[str] + git_payload: + name: git_payload + primitive: Dict[Any] + git_repository: + lock: true + name: git_repository + primitive: Dict[str, str] + spec: + defaults: {} + name: GitRepoSpec + types: + URL: str + directory: str + subspec: false + got_running_containers: + name: got_running_containers + primitive: bool + is_default_branch: + name: is_default_branch + primitive: bool + is_image_built: + name: is_image_built + primitive: bool + valid_git_repository_URL: + name: valid_git_repository_URL + primitive: boolean +flow: + check_if_default_branch: + inputs: + payload: + - seed + clone_git_repo: + conditions: + - seed + inputs: + URL: + - get_url_from_payload: url + docker_build_image: + conditions: + - check_if_default_branch: is_default_branch + - get_status_running_containers: status + inputs: + docker_commands: + - parse_docker_commands: docker_commands + get_image_tag: + inputs: + payload: + - seed + get_running_containers: + inputs: + tag: + - get_image_tag: image_tag + get_status_running_containers: + inputs: + containers: + - get_running_containers: running_containers + get_url_from_payload: + inputs: + payload: + - seed + parse_docker_commands: + inputs: + image_tag: + - get_image_tag: image_tag + repo: + - clone_git_repo: repo + restart_running_containers: + conditions: + - docker_build_image: build_status + inputs: + containers: + - get_running_containers: running_containers + docker_commands: + - parse_docker_commands: docker_commands +linked: true +operations: + check_if_default_branch: + inputs: + payload: git_payload + name: check_if_default_branch + outputs: + is_default_branch: is_default_branch + stage: processing + clone_git_repo: + conditions: + - valid_git_repository_URL + inputs: + URL: URL + name: clone_git_repo + outputs: + repo: git_repository + stage: processing + docker_build_image: + conditions: + - is_default_branch + - got_running_containers + inputs: + docker_commands: docker_commands + name: docker_build_image + outputs: + build_status: is_image_built + stage: processing + get_image_tag: + inputs: + payload: git_payload + name: get_image_tag + outputs: + image_tag: docker_image_tag + stage: processing + get_running_containers: + inputs: + tag: docker_image_tag + name: get_running_containers + outputs: + running_containers: docker_running_containers + stage: processing + get_status_running_containers: + inputs: + containers: docker_running_containers + name: get_status_running_containers + outputs: + status: got_running_containers + stage: processing + get_url_from_payload: + inputs: + payload: git_payload + name: get_url_from_payload + outputs: + url: URL + stage: processing + parse_docker_commands: + inputs: + image_tag: docker_image_tag + repo: git_repository + name: parse_docker_commands + outputs: + docker_commands: docker_commands + stage: processing + restart_running_containers: + conditions: + - is_image_built + inputs: + containers: docker_running_containers + docker_commands: docker_commands + name: restart_running_containers + outputs: {} + stage: processing + diff --git a/examples/ffmpeg/deploy/webhook/mc/http/webhook.yaml b/examples/ffmpeg/deploy/webhook/mc/http/webhook.yaml new file mode 100644 index 0000000000..a859d50d0f --- /dev/null +++ b/examples/ffmpeg/deploy/webhook/mc/http/webhook.yaml @@ -0,0 +1,3 @@ +path: /webhook/github +output_mode: json +input_mode: json:git_payload \ No newline at end of file diff --git a/examples/ffmpeg/ffmpeg/__init__.py b/examples/ffmpeg/ffmpeg/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/ffmpeg/ffmpeg/dataflow.py b/examples/ffmpeg/ffmpeg/dataflow.py new file mode 100644 index 0000000000..df1e5d401b --- /dev/null +++ b/examples/ffmpeg/ffmpeg/dataflow.py @@ -0,0 +1,11 @@ +from dffml import DataFlow, Input, GetSingle +from .operations import convert_to_gif + +DATAFLOW = DataFlow.auto(convert_to_gif, GetSingle) +DATAFLOW.seed = [ + Input( + value=[convert_to_gif.op.outputs["output_file"].name], + definition=GetSingle.op.inputs["spec"], + ), + Input(value=480, definition=convert_to_gif.op.inputs["resolution"]), +] diff --git a/examples/ffmpeg/ffmpeg/definitions.py b/examples/ffmpeg/ffmpeg/definitions.py new file mode 100644 index 0000000000..2442124309 --- /dev/null +++ b/examples/ffmpeg/ffmpeg/definitions.py @@ -0,0 +1,11 @@ +import sys +from dffml.df.types import Definition + +definitions = [ + Definition(name="input_file", primitive="bytes"), + Definition(name="Resolution", primitive="int"), + Definition(name="output_file", primitive="bytes"), +] + +for definition in definitions: + setattr(sys.modules[__name__], definition.name, definition) diff --git a/examples/ffmpeg/ffmpeg/operations.py b/examples/ffmpeg/ffmpeg/operations.py new file mode 100644 index 0000000000..8771fd9ec8 --- /dev/null +++ b/examples/ffmpeg/ffmpeg/operations.py @@ -0,0 +1,38 @@ +import asyncio +import tempfile + +from dffml import op, Definition + + +@op( + inputs={ + "input_file": Definition(name="input_file", primitive="bytes"), + "resolution": Definition(name="resolution", primitive="int"), + }, + outputs={"output_file": Definition(name="output_file", primitive="bytes")}, +) +async def convert_to_gif(input_file, resolution): + temp_input_file = tempfile.NamedTemporaryFile(prefix="ffmpeg-") + temp_input_file.write(input_file) + temp_input_file.seek(0) + proc = await asyncio.create_subprocess_exec( + "ffmpeg", + "-ss", + "0.3", + "-t", + "10", + "-i", + temp_input_file.name, + "-y", + "-vf", + f"fps=10,scale={resolution}:-1:flags=lanczos,split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse", + "-loop", + "0", + "-f", + "gif", + "pipe:1", + stdout=asyncio.subprocess.PIPE, + ) + out, error = await proc.communicate() + temp_input_file.close() + return {"output_file": out} diff --git a/examples/ffmpeg/ffmpeg/version.py b/examples/ffmpeg/ffmpeg/version.py new file mode 100644 index 0000000000..901e5110b2 --- /dev/null +++ b/examples/ffmpeg/ffmpeg/version.py @@ -0,0 +1 @@ +VERSION = "0.0.1" diff --git a/examples/ffmpeg/pyproject.toml b/examples/ffmpeg/pyproject.toml new file mode 100644 index 0000000000..8b9d32fa10 --- /dev/null +++ b/examples/ffmpeg/pyproject.toml @@ -0,0 +1,20 @@ +[tool.black] +line-length = 79 +target-version = ['py37'] + +exclude = ''' +( + /( + \.eggs # exclude a few common directories in the + | \.git # root of the project + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist + ) +) +''' diff --git a/examples/ffmpeg/setup.py b/examples/ffmpeg/setup.py new file mode 100644 index 0000000000..7707a446da --- /dev/null +++ b/examples/ffmpeg/setup.py @@ -0,0 +1,18 @@ +import os +import importlib.util +from setuptools import setup + +# Boilerplate to load commonalities +spec = importlib.util.spec_from_file_location( + "setup_common", os.path.join(os.path.dirname(__file__), "setup_common.py") +) +common = importlib.util.module_from_spec(spec) +spec.loader.exec_module(common) + +common.KWARGS["entry_points"] = { + "dffml.operation": [ + f"convert_to_gif = {common.IMPORT_NAME}.operations:convert_to_gif" + ] +} + +setup(**common.KWARGS) diff --git a/examples/ffmpeg/setup_common.py b/examples/ffmpeg/setup_common.py new file mode 100644 index 0000000000..55116c5ad1 --- /dev/null +++ b/examples/ffmpeg/setup_common.py @@ -0,0 +1,78 @@ +import os +import sys +import ast +from pathlib import Path +from setuptools import find_packages + +ORG = "aghinsa" +NAME = "ffmpeg" +DESCRIPTION = "DFFML operations ffmpeg" +AUTHOR_NAME = "Aghin Shah Alin" +AUTHOR_EMAIL = "aghinsa@gmail.com" +# Install dffml if it is not installed in development mode +INSTALL_REQUIRES = [] + ( + ["dffml>=0.3.6"] + if not any( + list( + map( + os.path.isfile, + list( + map( + lambda syspath: os.path.join( + syspath, "dffml.egg-link" + ), + sys.path, + ) + ), + ) + ) + ) + else [] +) + +IMPORT_NAME = ( + NAME + if "replace_package_name".upper() != NAME + else "replace_import_package_name".upper() +).replace("-", "_") + +SELF_PATH = Path(sys.argv[0]).parent.resolve() +if not (SELF_PATH / Path(IMPORT_NAME, "version.py")).is_file(): + SELF_PATH = os.path.dirname(os.path.realpath(__file__)) + +VERSION = ast.literal_eval( + Path(SELF_PATH, IMPORT_NAME, "version.py") + .read_text() + .split("=")[-1] + .strip() +) + +README = Path(SELF_PATH, "README.md").read_text() + +KWARGS = dict( + name=NAME, + version=VERSION, + description=DESCRIPTION, + long_description=README, + long_description_content_type="text/markdown", + author=AUTHOR_NAME, + author_email=AUTHOR_EMAIL, + maintainer=AUTHOR_NAME, + maintainer_email=AUTHOR_EMAIL, + url=f"https://github.com/{ORG}/{NAME}", + license="MIT", + keywords=["dffml"], + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + ], + install_requires=INSTALL_REQUIRES, + packages=find_packages(), +) diff --git a/examples/ffmpeg/tests/__init__.py b/examples/ffmpeg/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/ffmpeg/tests/input.mp4 b/examples/ffmpeg/tests/input.mp4 new file mode 100644 index 0000000000..5cb88f771a Binary files /dev/null and b/examples/ffmpeg/tests/input.mp4 differ diff --git a/examples/ffmpeg/tests/test_operations.py b/examples/ffmpeg/tests/test_operations.py new file mode 100644 index 0000000000..3653e5304b --- /dev/null +++ b/examples/ffmpeg/tests/test_operations.py @@ -0,0 +1,53 @@ +import os +import sys +import pathlib +import tempfile + + +from dffml.df.types import Input, DataFlow +from dffml.df.base import opimp_in +from dffml.df.memory import MemoryOrchestrator +from dffml.operation.output import GetSingle +from dffml.util.asynctestcase import AsyncTestCase + +from ffmpeg.operations import convert_to_gif +from dffml.operation.output import GetSingle + + +class TestOperations(AsyncTestCase): + async def setUp(self): + self.parent_path = pathlib.Path(__file__).parent + + async def test_run(self): + dataflow = DataFlow.auto(convert_to_gif, GetSingle) + dataflow.seed.append( + Input( + value=[convert_to_gif.op.outputs["output_file"].name], + definition=GetSingle.op.inputs["spec"], + ) + ) + + input_file_path = self.parent_path / "input.mp4" + + with open(input_file_path, "rb") as f: + input_file = f.read(-1) + + test_inputs = { + "Test": [ + Input( + value=input_file, + definition=convert_to_gif.op.inputs["input_file"], + ), + Input( + value=240, + definition=convert_to_gif.op.inputs["resolution"], + ), + ] + } + + async with MemoryOrchestrator.withconfig({}) as orchestrator: + async with orchestrator(dataflow) as octx: + async for ctx, results in octx.run(test_inputs): + self.assertIn("output_file", results) + output = results["output_file"] + self.assertGreater(len(output), 100000) diff --git a/examples/io/io_usage.py b/examples/io/io_usage.py index 035ffdb63b..9916d0c69b 100644 --- a/examples/io/io_usage.py +++ b/examples/io/io_usage.py @@ -2,7 +2,7 @@ from dffml import * slr_model = SLRModel( - features=Features(DefFeature("Years", int, 1),), + features=Features(DefFeature("Years", int, 1)), predict=DefFeature("Salary", int, 1), ) diff --git a/examples/maintained/cgi-bin/dataflow.yaml b/examples/maintained/cgi-bin/dataflow.yaml index 3ce19f1361..33f9dd9e41 100644 --- a/examples/maintained/cgi-bin/dataflow.yaml +++ b/examples/maintained/cgi-bin/dataflow.yaml @@ -1,4 +1,3 @@ -configs: {} definitions: URL: name: URL @@ -34,6 +33,7 @@ definitions: types: URL: str directory: str + subspec: false git_repository_checked_out: lock: true name: git_repository_checked_out @@ -45,19 +45,13 @@ definitions: URL: str commit: str directory: str + subspec: false group_by_output: name: group_by_output primitive: Dict[str, List[Any]] group_by_spec: name: group_by_spec primitive: Dict[str, Any] - spec: - defaults: {} - name: GroupBySpec - types: - by: Definition - fill: generic - group: Definition no_git_branch_given: name: no_git_branch_given primitive: boolean @@ -300,3 +294,5 @@ seed: value: 8 - definition: quarter value: 9 + + diff --git a/examples/shouldi/shouldi/deploy/mc/http/shouldi.yaml b/examples/shouldi/shouldi/deploy/mc/http/shouldi.yaml index 562f439516..0bd2f74c54 100644 --- a/examples/shouldi/shouldi/deploy/mc/http/shouldi.yaml +++ b/examples/shouldi/shouldi/deploy/mc/http/shouldi.yaml @@ -1,3 +1,3 @@ path: /shouldi -presentation: json +output_mode: json asynchronous: false diff --git a/feature/git/dffml_feature_git/feature/operations.py b/feature/git/dffml_feature_git/feature/operations.py index 2b51126c63..c56758cef7 100644 --- a/feature/git/dffml_feature_git/feature/operations.py +++ b/feature/git/dffml_feature_git/feature/operations.py @@ -90,7 +90,7 @@ async def clone_git_repo(URL: str): directory = tempfile.mkdtemp(prefix="dffml-feature-git-") exit_code = await exec_with_logging("git", "clone", URL, directory) if exit_code != 0: - shutil.rmtree(repo["directory"]) + shutil.rmtree(directory) raise RuntimeError("Failed to clone git repo %r" % (URL,)) return {"repo": {"URL": URL, "directory": directory}} diff --git a/model/transformers/dffml_model_transformers/ner/ner_model.py b/model/transformers/dffml_model_transformers/ner/ner_model.py index 7b6a5e4a94..248b87514d 100644 --- a/model/transformers/dffml_model_transformers/ner/ner_model.py +++ b/model/transformers/dffml_model_transformers/ner/ner_model.py @@ -796,7 +796,7 @@ async def accuracy(self, sources: Sources): for c in sorted( pathlib( config["output_dir"] + "/**/" + TF2_WEIGHTS_NAME - ).glob(recursive=True,), + ).glob(recursive=True), key=lambda f: int("".join(filter(str.isdigit, f)) or -1), ) ) diff --git a/model/vowpalWabbit/dffml_model_vowpalWabbit/vw_base.py b/model/vowpalWabbit/dffml_model_vowpalWabbit/vw_base.py index 3b1c974e5c..6f1b135d87 100644 --- a/model/vowpalWabbit/dffml_model_vowpalWabbit/vw_base.py +++ b/model/vowpalWabbit/dffml_model_vowpalWabbit/vw_base.py @@ -7,12 +7,7 @@ import json import hashlib from pathlib import Path -from typing import ( - AsyncIterator, - Tuple, - Any, - List, -) +from typing import AsyncIterator, Tuple, Any, List import numpy as np import pandas as pd @@ -368,7 +363,7 @@ async def predict( prediction = self.clf.predict(data[0]) self.logger.debug( "Predicted Value of {} for {}: {}".format( - self.parent.config.predict.NAME, data, prediction, + self.parent.config.predict.NAME, data, prediction ) ) target = self.parent.config.predict.NAME diff --git a/model/vowpalWabbit/setup.py b/model/vowpalWabbit/setup.py index 048828fb99..105a147cae 100644 --- a/model/vowpalWabbit/setup.py +++ b/model/vowpalWabbit/setup.py @@ -76,7 +76,5 @@ ], install_requires=INSTALL_REQUIRES, packages=find_packages(), - entry_points={ - "dffml.model": [f"vwmodel = {IMPORT_NAME}.vw_base:VWModel",] - }, + entry_points={"dffml.model": [f"vwmodel = {IMPORT_NAME}.vw_base:VWModel"]}, ) diff --git a/operations/binsec/setup.py b/operations/binsec/setup.py index e55327970b..30fe870421 100644 --- a/operations/binsec/setup.py +++ b/operations/binsec/setup.py @@ -71,6 +71,6 @@ "files_in_rpm = dffml_operations_binsec.operations:files_in_rpm", "is_binary_pie = dffml_operations_binsec.operations:is_binary_pie", "cleanup_rpm = dffml_operations_binsec.operations:cleanup_rpm", - ], + ] }, ) diff --git a/operations/deploy/.coveragerc b/operations/deploy/.coveragerc new file mode 100644 index 0000000000..29e6a10c5e --- /dev/null +++ b/operations/deploy/.coveragerc @@ -0,0 +1,13 @@ +[run] +source = + deploy + tests +branch = True + +[report] +exclude_lines = + no cov + no qa + noqa + pragma: no cover + if __name__ == .__main__.: diff --git a/operations/deploy/.gitignore b/operations/deploy/.gitignore new file mode 100644 index 0000000000..070ee81c83 --- /dev/null +++ b/operations/deploy/.gitignore @@ -0,0 +1,20 @@ +*.log +*.pyc +.cache/ +.coverage +.idea/ +.vscode/ +*.egg-info/ +build/ +dist/ +docs/build/ +venv/ +wheelhouse/ +*.egss +.mypy_cache/ +*.swp +.venv/ +.eggs/ +*.modeldir +*.db +htmlcov/ diff --git a/operations/deploy/Dockerfile b/operations/deploy/Dockerfile new file mode 100644 index 0000000000..c540220956 --- /dev/null +++ b/operations/deploy/Dockerfile @@ -0,0 +1,27 @@ +# Usage +# docker build -t dffml/deploy . +# docker run --rm -ti -p 80:8080 dffml/deploy -insecure -log debug +# +FROM ubuntu:20.04 + +RUN apt-get update && \ + apt-get install -y \ + gcc \ + python3-dev \ + python3-pip \ + python3 \ + ca-certificates && \ + python3 -m pip install -U pip && \ + python3 -m pip install dffml-service-http && \ + apt-get purge -y \ + gcc \ + python3-dev && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /usr/src/app +COPY . /usr/src/app + +RUN python3 -m pip install -e . + +ENTRYPOINT ["python3", "-m", "dffml", "service", "http", "server", "-addr", "0.0.0.0"] +CMD ["-mc-config", "deploy/deploy"] diff --git a/operations/deploy/LICENSE b/operations/deploy/LICENSE new file mode 100644 index 0000000000..96488ee14b --- /dev/null +++ b/operations/deploy/LICENSE @@ -0,0 +1,21 @@ +Copyright (c) 2019 aghinsa + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/operations/deploy/MANIFEST.in b/operations/deploy/MANIFEST.in new file mode 100644 index 0000000000..19f3196490 --- /dev/null +++ b/operations/deploy/MANIFEST.in @@ -0,0 +1,3 @@ +include README.md +include LICENSE +include setup_common.py diff --git a/operations/deploy/README.md b/operations/deploy/README.md new file mode 100644 index 0000000000..233590cbde --- /dev/null +++ b/operations/deploy/README.md @@ -0,0 +1,8 @@ +# DFFML deploy Operations + +* Contains operations to [re]deploy docker containers. +* The containers are redployed according to the image-tag. + +## License + +DFFML deploy is distributed under the [MIT License](LICENSE). diff --git a/operations/deploy/dffml_operations_deploy/__init__.py b/operations/deploy/dffml_operations_deploy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/operations/deploy/dffml_operations_deploy/definitions.py b/operations/deploy/dffml_operations_deploy/definitions.py new file mode 100644 index 0000000000..a37639f0f5 --- /dev/null +++ b/operations/deploy/dffml_operations_deploy/definitions.py @@ -0,0 +1,17 @@ +import sys +from dffml.df.types import Definition + +definitions = [ + Definition(name="git_payload", primitive="Dict[Any]"), + Definition(name="docker_image_id", primitive="str"), + Definition(name="is_default_branch", primitive="bool"), + Definition(name="docker_image_tag", primitive="str"), + Definition(name="docker_running_containers", primitive="List[str]"), + Definition(name="got_running_containers", primitive="bool"), + Definition(name="is_image_built", primitive="bool"), + Definition(name="docker_commands", primitive="Dict[str,Any]"), + Definition(name="docker_restarted_containers", primitive="str"), +] + +for definition in definitions: + setattr(sys.modules[__name__], definition.name, definition) diff --git a/operations/deploy/dffml_operations_deploy/exceptions.py b/operations/deploy/dffml_operations_deploy/exceptions.py new file mode 100644 index 0000000000..36b3e86e0c --- /dev/null +++ b/operations/deploy/dffml_operations_deploy/exceptions.py @@ -0,0 +1,12 @@ +class CannotRemoveContainer(Exception): + """ + Raised when `docker rm -f CONTAINER` fails to + stop and remove the container + """ + + +class UsageNotFound(Exception): + """ + Raised when docker file does not have `docker run` + command in comments + """ diff --git a/operations/deploy/dffml_operations_deploy/log.py b/operations/deploy/dffml_operations_deploy/log.py new file mode 100644 index 0000000000..91b0ce59b5 --- /dev/null +++ b/operations/deploy/dffml_operations_deploy/log.py @@ -0,0 +1,4 @@ +"""Logging""" +import logging + +LOGGER = logging.getLogger(__package__) diff --git a/operations/deploy/dffml_operations_deploy/operations.py b/operations/deploy/dffml_operations_deploy/operations.py new file mode 100644 index 0000000000..ca5effe346 --- /dev/null +++ b/operations/deploy/dffml_operations_deploy/operations.py @@ -0,0 +1,149 @@ +import re +import shlex +import pathlib +from typing import Dict, Any + +from dffml.df.base import op +from .definitions import * +from .exceptions import * +from .log import LOGGER + +from dffml_feature_git.util.proc import check_output +from dffml_feature_git.feature.operations import clone_git_repo + + +@op( + inputs={"payload": git_payload}, + outputs={"url": clone_git_repo.op.inputs["URL"]}, +) +def get_url_from_payload(payload: Dict[str, Any]): + url = payload["repository"]["clone_url"] + LOGGER.debug(f"Got url:{url} from payload") + return {"url": url} + + +@op( + inputs={"payload": git_payload}, + outputs={"is_default_branch": is_default_branch}, +) +def check_if_default_branch(payload): + pushed_branch = payload["ref"].split("/")[-1] + default_branch = payload["repository"]["default_branch"] + return {"is_default_branch": (default_branch == pushed_branch)} + + +@op(inputs={"payload": git_payload}, outputs={"image_tag": docker_image_tag}) +def get_image_tag(payload): + url = payload["repository"][ + "html_url" + ] # eg:-"https://github.com/username/Hello-World", + tag = url.split("/") + tag = f"{tag[-2]}/{tag[-1]}" + LOGGER.debug(f"Got image tag:{tag}") + return {"image_tag": tag} + + +@op( + inputs={"tag": docker_image_tag}, + outputs={"running_containers": docker_running_containers}, +) +async def get_running_containers(tag): + containers = await check_output( + "docker", "ps", "--filter", f"ancestor={tag}", "--format", "{{.ID}}" + ) + containers = [ + container for container in containers.strip().split("\n") if container + ] + LOGGER.debug(f"Running containers:{containers}") + return {"running_containers": containers} + + +@op( + inputs={"containers": docker_running_containers}, + outputs={"status": got_running_containers}, +) +def get_status_running_containers(containers): + return {"status": True} + + +@op( + inputs={ + "repo": clone_git_repo.op.outputs["repo"], + "image_tag": docker_image_tag, + }, + outputs={"docker_commands": docker_commands}, +) +async def parse_docker_commands(repo, image_tag): + docker_file = next(pathlib.Path(repo.directory).rglob("Dockerfile")) + docker_build_cmd = [ + "docker", + "build", + "-t", + image_tag, + str(docker_file.parent), + ] + with open(docker_file, "r") as f: + s = f.read() + + # parsing run command from Dockerfile + # parses lines starting with "# docker run" to line ending with "# " + x = re.findall("(?:#[ ]*docker run )(?:.|\n)*(?:#[ ]*\n)", s) + if not x: + # handles case were `FROM` starts immediatly after `usage` comments, + # without blank comment in between + x = re.findall("((?:#[ ]*docker run )(?:.|\n)*)FROM", s) + if not x: + raise UsageNotFound( + f"docker run command not found in comments in {docker_file}" + ) + + x = x[0].replace("#", "").strip() + if "--rm" in x: # --rm and --restart=always are conflicting options + x = x.replace("docker run", "docker run -d") + else: + x = x.replace("docker run", "docker run -d --restart=always") + docker_run_cmd = shlex.split(x) + docker_commands = {"build": docker_build_cmd, "run": docker_run_cmd} + LOGGER.debug(f"Docker commands:{docker_commands}") + return {"docker_commands": docker_commands} + + +@op( + inputs={"docker_commands": docker_commands}, + outputs={"build_status": is_image_built}, + conditions=[is_default_branch, got_running_containers], +) +async def docker_build_image(docker_commands): + # building image + cmd_out = await check_output(*docker_commands["build"]) + build_status = "Successfully built" in cmd_out + LOGGER.debug("Image built status : {build_status}") + return {"build_status": build_status} + + +@op( + inputs={ + "docker_commands": docker_commands, + "containers": docker_running_containers, + }, + conditions=[is_image_built], + outputs={"containers": docker_restarted_containers}, +) +async def restart_running_containers(docker_commands, containers): + # if no containers are running ,start a fresh one else + # stop running containers and start it again with the new built + new_containers = [] + if not containers: + out = await check_output(*docker_commands["run"]) + new_containers.append(out.strip()) + return {"containers": new_containers} + + for container in containers: + out = await check_output("docker", "rm", "-f", container) + if not (out.strip() == container): + raise CannotRemoveContainer( + f"Error when force removing container {container}" + ) + out = await check_output(*docker_commands["run"]) + new_containers.append(out.strip()) + return {"containers": new_containers} diff --git a/operations/deploy/dffml_operations_deploy/version.py b/operations/deploy/dffml_operations_deploy/version.py new file mode 100644 index 0000000000..901e5110b2 --- /dev/null +++ b/operations/deploy/dffml_operations_deploy/version.py @@ -0,0 +1 @@ +VERSION = "0.0.1" diff --git a/operations/deploy/pyproject.toml b/operations/deploy/pyproject.toml new file mode 100644 index 0000000000..8b9d32fa10 --- /dev/null +++ b/operations/deploy/pyproject.toml @@ -0,0 +1,20 @@ +[tool.black] +line-length = 79 +target-version = ['py37'] + +exclude = ''' +( + /( + \.eggs # exclude a few common directories in the + | \.git # root of the project + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist + ) +) +''' diff --git a/operations/deploy/setup.py b/operations/deploy/setup.py new file mode 100644 index 0000000000..5cc8aced08 --- /dev/null +++ b/operations/deploy/setup.py @@ -0,0 +1,25 @@ +import os +import importlib.util +from setuptools import setup + +# Boilerplate to load commonalities +spec = importlib.util.spec_from_file_location( + "setup_common", os.path.join(os.path.dirname(__file__), "setup_common.py") +) +common = importlib.util.module_from_spec(spec) +spec.loader.exec_module(common) + +common.KWARGS["entry_points"] = { + "dffml.operation": [ + f"get_url_from_payload = {common.IMPORT_NAME}.operations:get_url_from_payload", + f"check_if_default_branch = {common.IMPORT_NAME}.operations:check_if_default_branch", + f"get_image_tag = {common.IMPORT_NAME}.operations:get_image_tag", + f"get_running_containers = {common.IMPORT_NAME}.operations:get_running_containers", + f"get_status_running_containers = {common.IMPORT_NAME}.operations:get_status_running_containers", + f"parse_docker_commands = {common.IMPORT_NAME}.operations:parse_docker_commands", + f"docker_build_image = {common.IMPORT_NAME}.operations:docker_build_image", + f"restart_running_containers = {common.IMPORT_NAME}.operations:restart_running_containers", + ] +} + +setup(**common.KWARGS) diff --git a/operations/deploy/setup_common.py b/operations/deploy/setup_common.py new file mode 100644 index 0000000000..040b693695 --- /dev/null +++ b/operations/deploy/setup_common.py @@ -0,0 +1,97 @@ +import os +import sys +import ast +from pathlib import Path +from setuptools import find_packages + +ORG = "aghinsa" +NAME = "dffml-operations-deploy" +DESCRIPTION = "DFFML operations deploy" +AUTHOR_NAME = "Aghin Shah Alin" +AUTHOR_EMAIL = "aghinsa@gmail.com" +# Install dffml if it is not installed in development mode +INSTALL_REQUIRES = [] + ( + ["dffml>=0.3.7"] + if not any( + list( + map( + os.path.isfile, + list( + map( + lambda syspath: os.path.join( + syspath, "dffml.egg-link" + ), + sys.path, + ) + ), + ) + ) + ) + else [] + + ( + ["dffml-feature-git>=0.2.7"] + if not any( + list( + map( + os.path.isfile, + list( + map( + lambda syspath: os.path.join( + syspath, "dffml_feature_git.egg-link" + ), + sys.path, + ) + ), + ) + ) + ) + else [] + ) +) + +IMPORT_NAME = ( + NAME + if "replace_package_name".upper() != NAME + else "replace_import_package_name".upper() +).replace("-", "_") + +SELF_PATH = Path(sys.argv[0]).parent.resolve() +if not (SELF_PATH / Path(IMPORT_NAME, "version.py")).is_file(): + SELF_PATH = os.path.dirname(os.path.realpath(__file__)) + +VERSION = ast.literal_eval( + Path(SELF_PATH, IMPORT_NAME, "version.py") + .read_text() + .split("=")[-1] + .strip() +) + +README = Path(SELF_PATH, "README.md").read_text() + +KWARGS = dict( + name=NAME, + version=VERSION, + description=DESCRIPTION, + long_description=README, + long_description_content_type="text/markdown", + author=AUTHOR_NAME, + author_email=AUTHOR_EMAIL, + maintainer=AUTHOR_NAME, + maintainer_email=AUTHOR_EMAIL, + url=f"https://github.com/{ORG}/{NAME}", + license="MIT", + keywords=["dffml"], + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + ], + install_requires=INSTALL_REQUIRES, + packages=find_packages(), +) diff --git a/operations/deploy/tests/__init__.py b/operations/deploy/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/operations/deploy/tests/test_operations.py b/operations/deploy/tests/test_operations.py new file mode 100644 index 0000000000..d67c11e37e --- /dev/null +++ b/operations/deploy/tests/test_operations.py @@ -0,0 +1,144 @@ +import os +import sys +import uuid +import tempfile +from unittest import mock + +from dffml.df.types import Input, DataFlow +from dffml.df.base import opimp_in +from dffml.df.memory import MemoryOrchestrator +from dffml.util.asynctestcase import AsyncTestCase + +from dffml.base import BaseDataFlowFacilitatorObjectContext + + +from dffml.operation.output import GetSingle +from dffml_operations_deploy.operations import * +from dffml_feature_git.feature.operations import ( + clone_git_repo, + cleanup_git_repo, +) +from dffml_feature_git.util.proc import check_output + + +OPIMPS = opimp_in(sys.modules[__name__]) + +REPO = str(uuid.uuid4()).split("-")[-1] +USER = str(uuid.uuid4()).split("-")[-1] + +DOCKERFILE_CONTENT = ( + "# Usage\n" + + f"# docker run -d -t {USER}/{REPO}\n" + + "# \n" + + "FROM alpine\n" +) + + +class FakeCloneRepoImp(BaseDataFlowFacilitatorObjectContext): + def __init__(self, *args, **kwargs): + super().__init__() + + async def run(*args, **kwargs): + URL = args[1]["URL"] + directory = tempfile.mkdtemp(prefix="test_deploy") + with open(os.path.join(directory, "Dockerfile"), "w+") as dockerfile: + dockerfile.write(DOCKERFILE_CONTENT) + return {"repo": {"URL": URL, "directory": directory}} + + +class TestOperations(AsyncTestCase): + async def setUp(self): + self.dataflow = DataFlow.auto(*OPIMPS) + self.dataflow.seed.append( + Input( + value=[ + restart_running_containers.op.outputs["containers"].name + ], + definition=GetSingle.op.inputs["spec"], + ) + ) + self.test_inputs = { + "TestRun": [ + Input( + value={ + "ref": "refs/master", + "repository": { + "clone_url": f"https://github.com/{USER}/{REPO}.git", + "default_branch": "master", + "html_url": f"https://github.com/{USER}/{REPO}", + }, + }, + definition=get_url_from_payload.op.inputs["payload"], + ) + ] + } + self.containers_to_remove = [] + + async def tearDown(self): + for container in self.containers_to_remove: + await check_output("docker", "rm", "-f", container) + await check_output("docker", "rmi", f"{USER}/{REPO}") + + # test are numbered so that the first test (test_0_..) builds an image and starts a container, + # testing that a container will be started if one is not running already. + # (test_1_..) restarts the container started by test_0. + async def test_0_start_container(self): + with mock.patch.object( + clone_git_repo.imp, "CONTEXT", new=FakeCloneRepoImp + ): + tag = f"{USER}/{REPO}" + before = await check_output( + "docker", + "ps", + "--filter", + f"ancestor={tag}", + "--format", + "{{.ID}} {{.RunningFor}}", + ) + async with MemoryOrchestrator.withconfig({}) as orchestrator: + async with orchestrator(self.dataflow) as octx: + async for ctx, results in octx.run(self.test_inputs): + after = await check_output( + "docker", + "ps", + "--filter", + f"ancestor={tag}", + "--format", + "{{.ID}} {{.RunningFor}}", + ) + self.assertNotEqual(before, after) + self.assertIn("docker_restarted_containers", results) + self.containers_to_remove = results[ + "docker_restarted_containers" + ] + + async def test_1_restart_container(self): + tag = f"{USER}/{REPO}" + before = await check_output( + "docker", + "ps", + "--filter", + f"ancestor={tag}", + "--format", + "{{.ID}} {{.RunningFor}}", + ) + with mock.patch.object( + clone_git_repo.imp, "CONTEXT", new=FakeCloneRepoImp + ): + async with MemoryOrchestrator.withconfig({}) as orchestrator: + async with orchestrator(self.dataflow) as octx: + async for ctx, results in octx.run(self.test_inputs): + after = await check_output( + "docker", + "ps", + "--filter", + f"ancestor={tag}", + "--format", + "{{.ID}} {{.RunningFor}}", + ) + self.assertNotEqual(before, after) + self.assertIn("second", after) + self.assertIn("docker_restarted_containers", results) + self.containers_to_remove = results[ + "docker_restarted_containers" + ] diff --git a/scripts/all_tests.sh b/scripts/all_tests.sh index 8bce6f64c3..29f97310ac 100755 --- a/scripts/all_tests.sh +++ b/scripts/all_tests.sh @@ -14,6 +14,7 @@ PLUGINS=("${SRC_ROOT}/" \ "${SRC_ROOT}/model/scratch" \ "${SRC_ROOT}/model/scikit" \ "${SRC_ROOT}/examples/shouldi" \ + "${SRC_ROOT}/operations/deploy" \ "${SRC_ROOT}/feature/git" \ "${SRC_ROOT}/feature/auth" \ "${SRC_ROOT}/service/http" \ diff --git a/scripts/check_literalincludes.py b/scripts/check_literalincludes.py old mode 100644 new mode 100755 diff --git a/service/http/dffml_service_http/routes.py b/service/http/dffml_service_http/routes.py index a249ca3e6f..494f9ca8cf 100644 --- a/service/http/dffml_service_http/routes.py +++ b/service/http/dffml_service_http/routes.py @@ -24,8 +24,9 @@ MemoryInputSetConfig, StringInputSetContext, ) -from dffml.base import MissingConfig from dffml.model import Model +from dffml.base import MissingConfig +from dffml.util.data import traverse_get from dffml.source.source import BaseSource, SourcesContext from dffml.util.entrypoint import EntrypointNotFound, entrypoint @@ -132,10 +133,56 @@ async def get_mctx(self, request): class HTTPChannelConfig(NamedTuple): + """ + Config for channels. + + Parameters + ++++++++++ + path : str + Route in server. + dataflow : DataFlow + Flow to which inputs from request to path is forwarded too. + input_mode : str + Mode according to which input data is passed to the dataflow,default:"default". + - "default" : + Inputs are expected to be mapping of context to list of input + to definition mappings + eg:'{ + "insecure-package": + [ + { + "value":"insecure-package", + "definition":"package" + } + ] + }' + - "preprocess:definition_name" : + Input as whole is treated as value with the given definition. + Supported 'preporcess' tags : [json,text,bytes,stream] + output_mode : str + Mode according to which output from dataflow is treated. + - bytes:content_type:OUTPUT_KEYS : + OUTPUT_KEYS are . seperated string which is used as keys to traverse the + ouput of the flow. + eg: + `results = { + "post_input": + { + "hex":b'speak' + } + }` + then bytes:post_input.hex will return b'speak'. + - text:OUTPUT_KEYS + - json + - output of dataflow (Dict) is passes as json + + """ + path: str - presentation: str - asynchronous: bool dataflow: DataFlow + output_mode: str = "json" + asynchronous: bool = False + input_mode: str = "default" @classmethod def _fromdict(cls, **kwargs): @@ -145,7 +192,7 @@ def _fromdict(cls, **kwargs): @entrypoint("http") class Routes(BaseMultiCommContext): - PRESENTATION_OPTIONS = ["json", "blob", "text"] + IO_MODES = ["json", "text", "bytes", "stream"] async def get_registered_handler(self, request): return self.app["multicomm_routes"].get(request.path, None) @@ -157,37 +204,82 @@ async def multicomm_dataflow(self, config, request): inputs = [] # If data was sent add those inputs if request.method == "POST": - # Accept a list of input data - # TODO validate that input data is dict of list of inputs each item - # has definition and value properties - for ctx, client_inputs in (await request.json()).items(): - for input_data in client_inputs: - if ( - not input_data["definition"] - in config.dataflow.definitions - ): - return web.json_response( - { - "error": f"Missing definition for {input_data['definition']} in dataflow" - }, - status=HTTPStatus.NOT_FOUND, + # Accept a list of input data according to config.input_mode + if config.input_mode == "default": + # TODO validate that input data is dict of list of inputs each item + # has definition and value properties + for ctx, client_inputs in (await request.json()).items(): + for input_data in client_inputs: + if ( + not input_data["definition"] + in config.dataflow.definitions + ): + return web.json_response( + { + "error": f"Missing definition for {input_data['definition']} in dataflow" + }, + status=HTTPStatus.NOT_FOUND, + ) + inputs.append( + MemoryInputSet( + MemoryInputSetConfig( + ctx=StringInputSetContext(ctx), + inputs=[ + Input( + value=input_data["value"], + definition=config.dataflow.definitions[ + input_data["definition"] + ], + ) + for input_data in client_inputs + ], + ) ) + ) + elif ":" in config.input_mode: + preprocess_mode, input_def = config.input_mode.split(":") + if input_def not in config.dataflow.definitions: + return web.json_response( + { + "error": f"Missing definition for {input_data['definition']} in dataflow" + }, + status=HTTPStatus.NOT_FOUND, + ) + if preprocess_mode == "json": + value = await request.json() + elif preprocess_mode == "str": + value = await request.text() + elif preprocess_mode == "bytes": + value = await request.read() + elif preprocess == "stream": + value = request.content + else: + return web.json_response( + { + "error": f"preprocess tag must be one of {IO_MODES}, got {preprocess}" + }, + status=HTTPStatus.NOT_FOUND, + ) inputs.append( MemoryInputSet( MemoryInputSetConfig( - ctx=StringInputSetContext(ctx), + ctx=StringInputSetContext("post_input"), inputs=[ Input( - value=input_data["value"], + value=value, definition=config.dataflow.definitions[ - input_data["definition"] + input_def ], ) - for input_data in client_inputs ], ) ) ) + else: + raise NotImplementedError( + "Input modes other than default,preprocess:definition_name not yet implemented" + ) + # Run the operation in an orchestrator # TODO(dfass) Create the orchestrator on startup of the HTTP API itself async with MemoryOrchestrator.basic_config() as orchestrator: @@ -196,15 +288,37 @@ async def multicomm_dataflow(self, config, request): results = { str(ctx): result async for ctx, result in octx.run(*inputs) } - # TODO Implement input and presentation stages? - """ - if config.presentation == "blob": - return web.Response(body=results) - elif config.presentation == "text": - return web.Response(text=results) + if config.output_mode == "json": + return web.json_response(results) + + # content_info is a List[str] ([content_type,output_keys]) + # in case of stream,bytes and string in others + postprocess_mode, *content_info = config.output_mode.split(":") + + if postprocess_mode == "stream": + # stream:text/plain:get_single.beef + raise NotImplementedError( + "output mode not yet implemented" + ) + + elif postprocess_mode == "bytes": + content_type, output_keys = content_info + output_data = traverse_get( + results, *output_keys.split(".") + ) + return web.Response(body=output_data) + + elif postprocess_mode == "text": + output_data = traverse_get( + results, *content_info[0].split(".") + ) + return web.Response(text=output_data) + else: - """ - return web.json_response(results) + return web.json_response( + {"error": f"output mode not valid"}, + status=HTTPStatus.NOT_FOUND, + ) async def multicomm_dataflow_asynchronous(self, config, request): # TODO allow list of valid definitions to seed @@ -457,10 +571,10 @@ async def register(self, config: HTTPChannelConfig) -> None: @mcctx_route async def multicomm_register(self, request, mcctx): config = mcctx.register_config()._fromdict(**(await request.json())) - if config.presentation not in self.PRESENTATION_OPTIONS: + if config.output_mode not in self.IO_MODES: return web.json_response( { - "error": f"{config.presentation!r} is not a valid presentation option: {self.PRESENTATION_OPTIONS!r}" + "error": f"{config.output_mode!r} is not a valid output_mode option: {self.IO_MODES!r}" }, status=HTTPStatus.BAD_REQUEST, ) diff --git a/service/http/docs/dataflow.rst b/service/http/docs/dataflow.rst new file mode 100644 index 0000000000..b8b09c108a --- /dev/null +++ b/service/http/docs/dataflow.rst @@ -0,0 +1,74 @@ +DataFlow +======== + +The following documentation explains how to configure and deploy DataFlows +via the HTTP service. + +HttpChannelConfig +----------------- + +- ``asynchronous: bool`` + + - Unused right now but will be accessible over a websocket in the future and used for long running flows. + +- ``dataflow : DataFlow`` + + - Flow to which inputs from request to path is forwarded too. + +- ``input_mode : str`` + + - Mode according to which input data is passed to the dataflow, ``default:default``. + + - ``default`` + + Inputs are expected to be mapping of context to list of input + to definition mappings + eg: + + .. code-block:: json + + { + "insecure-package": + [ + { + "value":"insecure-package", + "definition":"package" + } + ] + } + + - ``preprocess:definition_name`` + + Input as whole is treated as value with the given definition after preprocessing. + Supported preprocess tags : [json,text,bytes,stream] + +- ``path : str`` + + - Route in server. + +- ``output_mode : str`` + + - Mode according to which output from dataflow is treated. + + - ``bytes:content_type:OUTPUT_KEYS`` + + - OUTPUT_KEYS are ``.`` seperated string which is used as keys to traverse the ouput of the flow. + eg: + + .. code-block:: python + + results = { + "post_input": + { + "hex":b'speak' + } + } + + then ``bytes:post_input.hex`` will return ``b'speak'``. + + - ``text:OUTPUT_KEYS`` + + - `json` + + - output of dataflow (Dict) is passes as json + diff --git a/service/http/docs/index.rst b/service/http/docs/index.rst index 7655b8edc4..e53a2d9845 100644 --- a/service/http/docs/index.rst +++ b/service/http/docs/index.rst @@ -47,6 +47,7 @@ development is probably hosted on another port, you'll need the ``cors`` flag. :caption: Contents: cli + dataflow api security javascript diff --git a/service/http/tests/test_cli.py b/service/http/tests/test_cli.py index c56a31d63d..9734ad674c 100644 --- a/service/http/tests/test_cli.py +++ b/service/http/tests/test_cli.py @@ -152,7 +152,7 @@ async def test_mc_config(self): json.dumps( { "path": hello_world_url, - "presentation": "json", + "output_mode": "json", "asynchronous": False, }, sort_keys=True, @@ -163,7 +163,7 @@ async def test_mc_config(self): json.dumps( { "path": hello_blank_url, - "presentation": "json", + "output_mode": "json", "asynchronous": False, }, sort_keys=True, diff --git a/service/http/tests/test_routes.py b/service/http/tests/test_routes.py index 41bb9b94ca..229f5d861b 100644 --- a/service/http/tests/test_routes.py +++ b/service/http/tests/test_routes.py @@ -296,7 +296,7 @@ async def test_no_post(self): f"/multicomm/self/register", json={ "path": url, - "presentation": "json", + "output_mode": "json", "asynchronous": False, "dataflow": HELLO_WORLD_DATAFLOW.export(), }, @@ -321,7 +321,7 @@ async def test_post(self): f"/multicomm/self/register", json={ "path": url, - "presentation": "json", + "output_mode": "json", "asynchronous": False, "dataflow": HELLO_BLANK_DATAFLOW.export(), }, diff --git a/tests/operation/test_io.py b/tests/operation/test_io.py index 0fc322c6f5..a9a95a3555 100644 --- a/tests/operation/test_io.py +++ b/tests/operation/test_io.py @@ -50,7 +50,7 @@ async def test_AcceptUserInput(self): async for ctx_str, results in octx.run(test_inputs): self.assertIn("UserInput", results) self.assertEqual( - "Testing AcceptUserInput", results["UserInput"], + "Testing AcceptUserInput", results["UserInput"] ) async def test_print_output(self): diff --git a/tests/source/test_ini.py b/tests/source/test_ini.py index 524e3aa1f5..68065e486f 100644 --- a/tests/source/test_ini.py +++ b/tests/source/test_ini.py @@ -13,13 +13,13 @@ async def test_ini(self): self.testfile = os.path.join(testdir, "testfile.ini") # Create a source source = INISource( - filename=self.testfile, allowempty=True, readwrite=True, + filename=self.testfile, allowempty=True, readwrite=True ) # Save some data in the source await save( source, - Record("section1", data={"features": {"A": 1, "B": 2,}}), - Record("section2", data={"features": {"C": 3, "D": 4,}}), + Record("section1", data={"features": {"A": 1, "B": 2}}), + Record("section2", data={"features": {"C": 3, "D": 4}}), ) # Load all the records records = [record async for record in load(source)] diff --git a/tests/test_docstrings.py b/tests/test_docstrings.py index 81b57089ab..cdd5c0f2a6 100644 --- a/tests/test_docstrings.py +++ b/tests/test_docstrings.py @@ -79,7 +79,7 @@ def wrap_operation_io_AcceptUserInput(state): def wrap_high_level_accuracy(state): model = SLRModel( - features=Features(DefFeature("Years", int, 1),), + features=Features(DefFeature("Years", int, 1)), predict=DefFeature("Salary", int, 1), ) @@ -99,7 +99,7 @@ def wrap_high_level_accuracy(state): def wrap_noasync_accuracy(state): model = SLRModel( - features=Features(DefFeature("Years", int, 1),), + features=Features(DefFeature("Years", int, 1)), predict=DefFeature("Salary", int, 1), ) @@ -247,11 +247,7 @@ def testcase(self): + cls.__qualname__ + "." + obj.__qualname__ - ] = ( - import_name, - module, - obj, - ) + ] = (import_name, module, obj) for name, (import_name, module, obj) in to_test.items(): # Check that class or function has an example that could be doctested