Skip to content

Commit

Permalink
Fix streaming mode (aio-libs#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov authored Sep 23, 2019
1 parent 23fca26 commit 5a6cbfb
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGES/344.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix streaming mode for pull, push, build, stats and events.
27 changes: 17 additions & 10 deletions aiodocker/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import tarfile

from .exceptions import DockerContainerError, DockerError
from .jsonstream import json_stream_result
from .jsonstream import json_stream_list, json_stream_stream
from .logs import DockerLog
from .multiplexed import multiplexed_result_list, multiplexed_result_stream
from .utils import identical, parse_result
Expand Down Expand Up @@ -230,17 +230,24 @@ async def port(self, private_port):

return h_ports

async def stats(self, *, stream=True):
def stats(self, *, stream=True):
cm = self.docker._query(
"containers/{self._id}/stats".format(self=self),
params={"stream": "1" if stream else "0"},
)
if stream:
async with self.docker._query(
"containers/{self._id}/stats".format(self=self), params={"stream": "1"}
) as response:
return await json_stream_result(response)
return self._stats_stream(cm)
else:
data = await self.docker._query_json(
"containers/{self._id}/stats".format(self=self), params={"stream": "0"}
)
return data
return self._stats_list(cm)

async def _stats_stream(self, cm):
async with cm as response:
async for item in json_stream_stream(response):
yield item

async def _stats_list(self, cm):
async with cm as response:
return await json_stream_list(response)

def __getitem__(self, key):
return self._container[key]
Expand Down
7 changes: 2 additions & 5 deletions aiodocker/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from collections import ChainMap

from .channel import Channel
from .jsonstream import json_stream_result
from .utils import human_bool
from .jsonstream import json_stream_stream


class DockerEvents:
Expand Down Expand Up @@ -56,9 +55,7 @@ async def run(self, **params):
async with self.docker._query(
"events", method="GET", params=params, timeout=0
) as response:
self.json_stream = await json_stream_result(
response, self._transform_event, human_bool(params["stream"])
)
self.json_stream = json_stream_stream(response, self._transform_event)
try:
async for data in self.json_stream:
await self.channel.publish(data)
Expand Down
71 changes: 47 additions & 24 deletions aiodocker/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import warnings
from typing import BinaryIO, List, Mapping, MutableMapping, Optional, Union

from .jsonstream import json_stream_result
from .jsonstream import json_stream_list, json_stream_stream
from .utils import clean_map, compose_auth_header


Expand Down Expand Up @@ -42,7 +42,7 @@ async def history(self, name: str) -> Mapping:
)
return response

async def pull(
def pull(
self,
from_image: str,
*,
Expand Down Expand Up @@ -77,12 +77,25 @@ async def pull(
)
# TODO: assert registry == repo?
headers["X-Registry-Auth"] = compose_auth_header(auth, registry)
async with self.docker._query(
"images/create", "POST", params=params, headers=headers
) as response:
return await json_stream_result(response, stream=stream)
cm = self.docker._query("images/create", "POST", params=params, headers=headers)
return self._handle_response(cm, stream)

def _handle_response(self, cm, stream):
if stream:
return self._handle_stream(cm)
else:
return self._handle_list(cm)

async def _handle_stream(self, cm):
async with cm as response:
async for item in json_stream_stream(response):
yield item

async def _handle_list(self, cm):
async with cm as response:
return await json_stream_list(response)

async def push(
def push(
self,
name: str,
*,
Expand All @@ -105,13 +118,13 @@ async def push(
"when auth information is provided"
)
headers["X-Registry-Auth"] = compose_auth_header(auth, registry)
async with self.docker._query(
cm = self.docker._query(
"images/{name}/push".format(name=name),
"POST",
params=params,
headers=headers,
) as response:
return await json_stream_result(response, stream=stream)
)
return self._handle_response(cm, stream)

async def tag(self, name: str, repo: str, *, tag: str = None) -> bool:
"""
Expand Down Expand Up @@ -155,7 +168,7 @@ async def delete(
"images/{name}".format(name=name), "DELETE", params=params
)

async def build(
def build(
self,
*,
remote: str = None,
Expand Down Expand Up @@ -229,17 +242,16 @@ async def build(
if labels:
params.update({"labels": json.dumps(labels)})

async with self.docker._query(
cm = self.docker._query(
"build",
"POST",
params=clean_map(params),
headers=headers,
data=local_context,
) as response:

return await json_stream_result(response, stream=stream)
)
return self._handle_response(cm, stream)

async def export_image(self, name: str):
def export_image(self, name: str):
"""
Get a tarball of an image by name or id.
Expand All @@ -249,12 +261,11 @@ async def export_image(self, name: str):
Returns:
Streamreader of tarball image
"""
async with self.docker._query(
"images/{name}/get".format(name=name), "GET"
) as response:
return response.content
return _ExportCM(
self.docker._query("images/{name}/get".format(name=name), "GET")
)

async def import_image(self, data, stream: bool = False):
def import_image(self, data, stream: bool = False):
"""
Import tarball of image to docker.
Expand All @@ -265,7 +276,19 @@ async def import_image(self, data, stream: bool = False):
Tarball of the image
"""
headers = {"Content-Type": "application/x-tar"}
async with self.docker._query_chunked_post(
cm = self.docker._query_chunked_post(
"images/load", "POST", data=data, headers=headers
) as response:
return await json_stream_result(response, stream=stream)
)
return self._handle_response(cm, stream)


class _ExportCM:
def __init__(self, cm):
self._cm = cm

async def __aenter__(self):
resp = await self._cm.__aenter__()
return resp.content

async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self._cm.__aexit__(exc_type, exc_val, exc_tb)
8 changes: 5 additions & 3 deletions aiodocker/jsonstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ async def _close(self):
self._response.close()


async def json_stream_result(response, transform=None, stream=True):
def json_stream_stream(response, transform=None):
json_stream = _JsonStreamResult(response, transform)
return json_stream

if stream:
return json_stream

async def json_stream_list(response, transform=None):
json_stream = _JsonStreamResult(response, transform)

data = []
async for obj in json_stream:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,41 @@ async def test_restart(docker):
await container.stop()
finally:
await container.delete(force=True)


@pytest.mark.asyncio
async def test_container_stats_list(docker):
name = "python:latest"
container = await docker.containers.run(
config={"Cmd": ["-c", "print('hello')"], "Entrypoint": "python", "Image": name}
)

try:
await container.start()
response = await container.wait()
assert response["StatusCode"] == 0
stats = await container.stats(stream=False)
assert "cpu_stats" in stats[0]
finally:
await container.delete(force=True)


@pytest.mark.asyncio
async def test_container_stats_stream(docker):
name = "python:latest"
container = await docker.containers.run(
config={"Cmd": ["-c", "print('hello')"], "Entrypoint": "python", "Image": name}
)

try:
await container.start()
response = await container.wait()
assert response["StatusCode"] == 0
count = 0
async for stat in container.stats():
assert "cpu_stats" in stat
count += 1
if count > 3:
break
finally:
await container.delete(force=True)
45 changes: 42 additions & 3 deletions tests/test_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ async def test_push_image(docker):
await docker.images.push(name=repository)


@pytest.mark.asyncio
async def test_push_image_stream(docker):
name = "python:latest"
repository = "localhost:5000/image"
await docker.images.tag(name=name, repo=repository)
async for item in docker.images.push(name=repository, stream=True):
pass


@pytest.mark.asyncio
async def test_delete_image(docker):
name = "python:latest"
Expand Down Expand Up @@ -113,7 +122,17 @@ async def test_pull_image(docker):

with pytest.warns(DeprecationWarning):
image = await docker.images.get(name=name)
assert image
assert "Architecture" in image


@pytest.mark.asyncio
async def test_pull_image_stream(docker):
name = "python:latest"
image = await docker.images.inspect(name=name)
assert image

async for item in docker.images.pull(name, stream=True):
pass


@pytest.mark.asyncio
Expand All @@ -131,11 +150,31 @@ async def test_build_from_tar(docker, random_name):
assert image


@pytest.mark.asyncio
async def test_build_from_tar_stream(docker, random_name):
name = "{}:latest".format(random_name())
dockerfile = """
# Shared Volume
FROM python:latest
"""
f = BytesIO(dockerfile.encode("utf-8"))
tar_obj = utils.mktar_from_dockerfile(f)
async for item in docker.images.build(
fileobj=tar_obj, encoding="gzip", tag=name, stream=True
):
pass
tar_obj.close()
image = await docker.images.inspect(name=name)
assert image


@pytest.mark.asyncio
async def test_export_image(docker):
name = "python:latest"
exported_image = await docker.images.export_image(name=name)
assert exported_image
async with docker.images.export_image(name=name) as exported_image:
assert exported_image
async for chunk in exported_image.iter_chunks():
pass


@pytest.mark.asyncio
Expand Down

0 comments on commit 5a6cbfb

Please sign in to comment.