Skip to content

Commit

Permalink
service: http: Add support for immediate response
Browse files Browse the repository at this point in the history
Fixes: intel#657
  • Loading branch information
aghinsa authored Jul 24, 2020
1 parent 1a9590f commit ad22864
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
core plugins
- HTTP service got a `-redirect` flag which allows for URL redirection via a
HTTP 307 response
- Support for immediate response in HTTP service
- Daal4py example usage.
### Changed
- Renamed `-seed` to `-inputs` in `dataflow create` command
Expand Down
69 changes: 62 additions & 7 deletions service/http/dffml_service_http/routes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import json
import asyncio
import secrets
import inspect
import pathlib
Expand All @@ -10,10 +11,11 @@
from functools import partial
from dataclasses import dataclass
from contextlib import AsyncExitStack
from typing import List, Union, AsyncIterator, Type, NamedTuple, Dict
from typing import List, Union, AsyncIterator, Type, NamedTuple, Dict, Any

from aiohttp import web
import aiohttp_cors
from aiohttp import web


from dffml import Sources, MemorySource
from dffml.record import Record
Expand Down Expand Up @@ -176,7 +178,21 @@ class HTTPChannelConfig(NamedTuple):
- text:OUTPUT_KEYS
- json
- output of dataflow (Dict) is passes as json
immediate_response: Dict[str,Any]
If provided with a reponse, server responds immediatly with
it, whilst scheduling to run the dataflow.
Expected keys:
- status: HTTP status code for the response
- content_type: MIME type.If not given, determined
from the presence of body/text/json
- body/text/json: One of this according to content_type
- headers
eg:
{
"status": 200,
"content_type": "application/json",
"data": {"text": "ok"},
}
"""

path: str
Expand All @@ -185,6 +201,7 @@ class HTTPChannelConfig(NamedTuple):
asynchronous: bool = False
input_mode: str = "default"
forward_headers: str = None
immediate_response: Dict[str, Any] = None

@classmethod
def _fromdict(cls, **kwargs):
Expand All @@ -199,7 +216,7 @@ class Routes(BaseMultiCommContext):
async def get_registered_handler(self, request):
return self.app["multicomm_routes"].get(request.path, None)

async def multicomm_dataflow(self, config, request):
async def _multicomm_dataflow(self, config, request):
# Seed the network with inputs given by caller
# TODO(p0,security) allowlist of valid definitions to seed (set
# Input.origin to something other than seed)
Expand All @@ -222,6 +239,7 @@ async def multicomm_dataflow(self, config, request):
},
status=HTTPStatus.NOT_FOUND,
)

inputs.append(
MemoryInputSet(
MemoryInputSetConfig(
Expand Down Expand Up @@ -251,17 +269,19 @@ async def multicomm_dataflow(self, config, request):
)
)
elif ":" in config.input_mode:
preprocess_mode, input_def = config.input_mode.split(":")
preprocess_mode, *input_def = config.input_mode.split(":")
input_def = ":".join(input_def)
if input_def not in config.dataflow.definitions:
return web.json_response(
{
"error": f"Missing definition for {input_def} in dataflow"
},
status=HTTPStatus.NOT_FOUND,
)

if preprocess_mode == "json":
value = await request.json()
elif preprocess_mode == "str":
elif preprocess_mode == "text":
value = await request.text()
elif preprocess_mode == "bytes":
value = await request.read()
Expand All @@ -270,10 +290,11 @@ async def multicomm_dataflow(self, config, request):
else:
return web.json_response(
{
"error": f"preprocess tag must be one of {IO_MODES}, got {preprocess_mode}"
"error": f"preprocess tag must be one of {self.IO_MODES}, got {preprocess_mode}"
},
status=HTTPStatus.NOT_FOUND,
)

inputs.append(
MemoryInputSet(
MemoryInputSetConfig(
Expand Down Expand Up @@ -301,6 +322,7 @@ async def multicomm_dataflow(self, config, request):
)
)
)

else:
raise NotImplementedError(
"Input modes other than default,preprocess:definition_name not yet implemented"
Expand All @@ -314,6 +336,7 @@ async def multicomm_dataflow(self, config, request):
results = {
str(ctx): result async for ctx, result in octx.run(*inputs)
}

if config.output_mode == "json":
return web.json_response(results)

Expand Down Expand Up @@ -342,6 +365,38 @@ async def multicomm_dataflow(self, config, request):
status=HTTPStatus.NOT_FOUND,
)

async def multicomm_dataflow(self, config, request):
if config.immediate_response:
asyncio.create_task(self._multicomm_dataflow(config, request))
ir = config.immediate_response
content_type = None
if "content_type" in ir:
content_type = ir["content_type"]
else:
if "data" in ir:
content_type = "application/json"
elif "text" in ir:
content_type = "text/plain"
elif "body" in ir:
content_type = "application/octet-stream"

if content_type == "application/json":
return web.json_response(
data={} if not "data" in ir else ir["data"],
status=200 if not "status" in ir else ir["status"],
headers=None if not "headers" in ir else ir["headers"],
)
else:
return web.Response(
body=None if not "body" in ir else ir["body"],
text=None if not "text" in ir else ir["text"],
status=200 if not "status" in ir else ir["status"],
headers=None if not "headers" in ir else ir["headers"],
content_type=content_type,
)
else:
return await self._multicomm_dataflow(config, request)

async def multicomm_dataflow_asynchronous(self, config, request):
# TODO allow list of valid definitions to seed
raise NotImplementedError(
Expand Down
19 changes: 19 additions & 0 deletions service/http/docs/dataflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,22 @@ HttpChannelConfig

- output of dataflow (Dict) is passes as json

- ``immediate_response: Dict[str,Any]``

- If provided with a reponse, server responds immediatly with
it, whilst scheduling to run the dataflow.
Expected keys:
- status: HTTP status code for the response
- content_type: MIME type, if not given, determined
from the presence of body/text/json
- body/text/json: One of this according to content_type
- headers
eg:

.. code-block:: python
{
"status": 200,
"content_type": "application/json",
"data": {"text": "ok"},
}
34 changes: 31 additions & 3 deletions service/http/tests/test_routes.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import os
import io
import aiohttp
import asyncio
import pathlib
import tempfile
from unittest.mock import patch
from contextlib import asynccontextmanager, ExitStack, AsyncExitStack
from typing import AsyncIterator, Dict

import aiohttp

from dffml import DataFlow
from dffml.base import config
from dffml.record import Record
from dffml.df.base import BaseConfig
from dffml.df.base import BaseConfig, op
from dffml.operation.output import GetSingle
from dffml.util.entrypoint import EntrypointNotFound
from dffml.model.model import ModelContext, Model
Expand All @@ -28,6 +29,7 @@
SOURCE_NOT_LOADED,
MODEL_NOT_LOADED,
MODEL_NO_SOURCES,
HTTPChannelConfig,
)
from dffml_service_http.util.testing import (
ServerRunner,
Expand Down Expand Up @@ -315,6 +317,32 @@ async def test_post(self):
{"Feedface": {"response": message}}, await response.json()
)

async def test_immediate_response(self):
url: str = "/some/url"
event = asyncio.Event()

@op()
async def my_event_setter(trigger: int) -> None:
event.set()

# Register the data flow
await self.cli.register(
HTTPChannelConfig(
path=url,
dataflow=DataFlow.auto(my_event_setter),
input_mode=f"text:{my_event_setter.op.inputs['trigger'].name}",
output_mode="json",
immediate_response={
"status": 200,
"content_type": "application/json",
"data": {"immediate": "response"},
},
)
)
async with self.post(url, json="trigger") as response:
self.assertEqual(await response.json(), {"immediate": "response"})
await asyncio.wait_for(event.wait(), timeout=2)


class TestRoutesSource(TestRoutesRunning, AsyncTestCase):
async def setUp(self):
Expand Down

0 comments on commit ad22864

Please sign in to comment.