diff --git a/.idea/dictionaries/pavel.xml b/.idea/dictionaries/pavel.xml
index 9126bc6..bf84473 100644
--- a/.idea/dictionaries/pavel.xml
+++ b/.idea/dictionaries/pavel.xml
@@ -15,6 +15,7 @@
autodoc
autosummary
baudrate
+ booleaness
bools
bufferless
bysource
diff --git a/appveyor.yml b/appveyor.yml
index 37c2c5c..da599d6 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -7,6 +7,10 @@ environment:
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019
PYTHON: "C:\\Python310-x64"
+ - job_group: tests
+ APPVEYOR_BUILD_WORKER_IMAGE: macos
+ PYTHON: "3.9"
+
- job_group: tests
APPVEYOR_BUILD_WORKER_IMAGE: Ubuntu2004
PYTHON: "3.10"
@@ -23,14 +27,6 @@ environment:
APPVEYOR_BUILD_WORKER_IMAGE: Ubuntu2004
PYTHON: "3.7"
- - job_group: tests
- APPVEYOR_BUILD_WORKER_IMAGE: Ubuntu2004
- PYTHON: "3.6"
-
- - job_group: tests
- APPVEYOR_BUILD_WORKER_IMAGE: macos
- PYTHON: "3.9"
-
- job_name: deploy
job_depends_on: tests
APPVEYOR_BUILD_WORKER_IMAGE: Ubuntu2004
diff --git a/noxfile.py b/noxfile.py
index 3cfcb03..2ecd19d 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -9,7 +9,7 @@
import nox
-PYTHONS = ["3.6", "3.7", "3.8", "3.9", "3.10"]
+PYTHONS = ["3.7", "3.8", "3.9", "3.10"]
"""The newest supported Python shall be listed LAST."""
nox.options.error_on_external_run = True
@@ -48,9 +48,9 @@ def test(session):
session.log("Using the newest supported Python: %s", is_latest_python(session))
session.install("-e", ".")
session.install(
- "pytest ~= 6.2",
- "pytest-randomly ~= 3.5",
- "coverage ~= 5.5",
+ "pytest ~= 7.0",
+ "pytest-randomly ~= 3.11",
+ "coverage ~= 6.3",
)
session.run("coverage", "run", "-m", "pytest")
session.run("coverage", "report", "--fail-under=95")
@@ -65,7 +65,7 @@ def lint(session):
session.log("Using the newest supported Python: %s", is_latest_python(session))
session.install(
"mypy == 0.931",
- "pylint == 2.7.2",
+ "pylint == 2.12.*",
)
session.run(
"mypy",
@@ -84,7 +84,7 @@ def lint(session):
},
)
if is_latest_python(session):
- session.install("black == 21.12b0")
+ session.install("black == 22.*")
session.run("black", "--check", ".")
diff --git a/pydsdl/__init__.py b/pydsdl/__init__.py
index 5c6ab59..21aa2e7 100644
--- a/pydsdl/__init__.py
+++ b/pydsdl/__init__.py
@@ -7,7 +7,7 @@
import os as _os
import sys as _sys
-__version__ = "1.13.0"
+__version__ = "1.14.0"
__version_info__ = tuple(map(int, __version__.split(".")[:3]))
__license__ = "MIT"
__author__ = "UAVCAN Consortium"
diff --git a/pydsdl/_data_schema_builder.py b/pydsdl/_data_schema_builder.py
index f4967f2..84c13b8 100644
--- a/pydsdl/_data_schema_builder.py
+++ b/pydsdl/_data_schema_builder.py
@@ -2,7 +2,7 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
-import typing
+from __future__ import annotations
from . import _error
from . import _serializable
from . import _bit_length_set
@@ -34,26 +34,26 @@ def __str__(self) -> str:
class DataSchemaBuilder:
def __init__(self) -> None:
- self._fields = [] # type: typing.List[_serializable.Field]
- self._constants = [] # type: typing.List[_serializable.Constant]
- self._serialization_mode = None # type: typing.Optional[SerializationMode]
+ self._fields: list[_serializable.Field] = []
+ self._constants: list[_serializable.Constant] = []
+ self._serialization_mode: SerializationMode | None = None
self._is_union = False
self._bit_length_computed_at_least_once = False
self._doc = ""
@property
- def fields(self) -> typing.List[_serializable.Field]:
+ def fields(self) -> list[_serializable.Field]:
assert all(map(lambda x: isinstance(x, _serializable.Field), self._fields))
return self._fields
@property
- def constants(self) -> typing.List[_serializable.Constant]:
+ def constants(self) -> list[_serializable.Constant]:
assert all(map(lambda x: isinstance(x, _serializable.Constant), self._constants))
return self._constants
@property
- def attributes(self) -> typing.List[_serializable.Attribute]: # noinspection PyTypeChecker
- out = [] # type: typing.List[_serializable.Attribute]
+ def attributes(self) -> list[_serializable.Attribute]: # noinspection PyTypeChecker
+ out = [] # type: list[_serializable.Attribute]
out += self.fields
out += self.constants
return out
@@ -63,7 +63,7 @@ def doc(self) -> str:
return self._doc
@property
- def serialization_mode(self) -> typing.Optional[SerializationMode]:
+ def serialization_mode(self) -> SerializationMode | None:
return self._serialization_mode
@property
diff --git a/pydsdl/_data_type_builder.py b/pydsdl/_data_type_builder.py
index c6f81ac..f458f45 100644
--- a/pydsdl/_data_type_builder.py
+++ b/pydsdl/_data_type_builder.py
@@ -2,9 +2,9 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
-import os
-import typing
+from typing import Optional, Callable, Iterable
import logging
+from pathlib import Path
from . import _serializable
from . import _expression
from . import _error
@@ -45,15 +45,15 @@ class DataTypeBuilder(_parser.StatementStreamProcessor):
def __init__(
self,
definition: _dsdl_definition.DSDLDefinition,
- lookup_definitions: typing.Iterable[_dsdl_definition.DSDLDefinition],
- print_output_handler: typing.Callable[[int, str], None],
+ lookup_definitions: Iterable[_dsdl_definition.DSDLDefinition],
+ print_output_handler: Callable[[int, str], None],
allow_unregulated_fixed_port_id: bool,
):
self._definition = definition
self._lookup_definitions = list(lookup_definitions)
self._print_output_handler = print_output_handler
self._allow_unregulated_fixed_port_id = allow_unregulated_fixed_port_id
- self._element_callback = None # type: typing.Optional[typing.Callable[[str], None]]
+ self._element_callback = None # type: Optional[Callable[[str], None]]
assert isinstance(self._definition, _dsdl_definition.DSDLDefinition)
assert all(map(lambda x: isinstance(x, _dsdl_definition.DSDLDefinition), lookup_definitions))
@@ -148,7 +148,7 @@ def on_padding_field(self, padding_field_type: _serializable.VoidType) -> None:
)
def on_directive(
- self, line_number: int, directive_name: str, associated_expression_value: typing.Optional[_expression.Any]
+ self, line_number: int, directive_name: str, associated_expression_value: Optional[_expression.Any]
) -> None:
try:
handler = {
@@ -209,8 +209,8 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version)
lookup_nss or "(empty set)",
)
if requested_ns not in lookup_nss and requested_ns == subroot_ns:
- error_description += " Did you mean to use the directory %r instead of %r?" % (
- os.path.join(self._definition.root_namespace_path, subroot_ns),
+ error_description += " Did you mean to use the directory %s instead of %s?" % (
+ self._definition.root_namespace_path / subroot_ns,
self._definition.root_namespace_path,
)
else:
@@ -231,7 +231,7 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version)
allow_unregulated_fixed_port_id=self._allow_unregulated_fixed_port_id,
)
- def _queue_attribute(self, element_callback: typing.Callable[[str], None]) -> None:
+ def _queue_attribute(self, element_callback: Callable[[str], None]) -> None:
self._flush_attribute("")
self._element_callback = element_callback
@@ -247,7 +247,7 @@ def _on_attribute(self) -> None:
"This is to prevent errors if the extent is dependent on the bit length set of the data schema."
)
- def _on_print_directive(self, line_number: int, value: typing.Optional[_expression.Any]) -> None:
+ def _on_print_directive(self, line_number: int, value: Optional[_expression.Any]) -> None:
_logger.info(
"Print directive at %s:%d%s",
self._definition.file_path,
@@ -256,7 +256,7 @@ def _on_print_directive(self, line_number: int, value: typing.Optional[_expressi
)
self._print_output_handler(line_number, str(value if value is not None else ""))
- def _on_assert_directive(self, line_number: int, value: typing.Optional[_expression.Any]) -> None:
+ def _on_assert_directive(self, line_number: int, value: Optional[_expression.Any]) -> None:
if isinstance(value, _expression.Boolean):
if not value.native_value:
raise AssertionCheckFailureError(
@@ -268,7 +268,7 @@ def _on_assert_directive(self, line_number: int, value: typing.Optional[_express
else:
raise InvalidDirectiveError("The assertion check expression must yield a boolean, not %s" % value.TYPE_NAME)
- def _on_extent_directive(self, line_number: int, value: typing.Optional[_expression.Any]) -> None:
+ def _on_extent_directive(self, line_number: int, value: Optional[_expression.Any]) -> None:
if self._structs[-1].serialization_mode is not None:
raise InvalidDirectiveError(
"Misplaced extent directive. The serialization mode is already set to %s"
@@ -284,7 +284,7 @@ def _on_extent_directive(self, line_number: int, value: typing.Optional[_express
else:
raise InvalidDirectiveError("The extent directive expects a rational, not %s" % value.TYPE_NAME)
- def _on_sealed_directive(self, _ln: int, value: typing.Optional[_expression.Any]) -> None:
+ def _on_sealed_directive(self, _ln: int, value: Optional[_expression.Any]) -> None:
if self._structs[-1].serialization_mode is not None:
raise InvalidDirectiveError(
"Misplaced sealing directive. The serialization mode is already set to %s"
@@ -294,7 +294,7 @@ def _on_sealed_directive(self, _ln: int, value: typing.Optional[_expression.Any]
raise InvalidDirectiveError("The sealed directive does not expect an expression")
self._structs[-1].set_serialization_mode(_data_schema_builder.SealedSerializationMode())
- def _on_union_directive(self, _ln: int, value: typing.Optional[_expression.Any]) -> None:
+ def _on_union_directive(self, _ln: int, value: Optional[_expression.Any]) -> None:
if value is not None:
raise InvalidDirectiveError("The union directive does not expect an expression")
if self._structs[-1].union:
@@ -303,7 +303,7 @@ def _on_union_directive(self, _ln: int, value: typing.Optional[_expression.Any])
raise InvalidDirectiveError("The union directive must be placed before the first " "attribute definition")
self._structs[-1].make_union()
- def _on_deprecated_directive(self, _ln: int, value: typing.Optional[_expression.Any]) -> None:
+ def _on_deprecated_directive(self, _ln: int, value: Optional[_expression.Any]) -> None:
if value is not None:
raise InvalidDirectiveError("The deprecated directive does not expect an expression")
if self._is_deprecated:
@@ -322,8 +322,8 @@ def _make_composite( # pylint: disable=too-many-arguments
name: str,
version: _serializable.Version,
deprecated: bool,
- fixed_port_id: typing.Optional[int],
- source_file_path: str,
+ fixed_port_id: Optional[int],
+ source_file_path: Path,
has_parent_service: bool,
) -> _serializable.CompositeType:
ty = _serializable.UnionType if builder.union else _serializable.StructureType
diff --git a/pydsdl/_dsdl_definition.py b/pydsdl/_dsdl_definition.py
index af68661..010e973 100644
--- a/pydsdl/_dsdl_definition.py
+++ b/pydsdl/_dsdl_definition.py
@@ -2,25 +2,27 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
+from __future__ import annotations
import os
import time
-import typing
+from typing import Iterable, Callable
import logging
-from . import _error
-from . import _serializable
+from pathlib import Path
+from ._error import FrontendError, InvalidDefinitionError, InternalError
+from ._serializable import CompositeType, Version
from . import _parser
_logger = logging.getLogger(__name__)
-class FileNameFormatError(_error.InvalidDefinitionError):
+class FileNameFormatError(InvalidDefinitionError):
"""
Raised when a DSDL definition file is named incorrectly.
"""
- def __init__(self, text: str, path: str):
- super().__init__(text=text, path=str(path))
+ def __init__(self, text: str, path: Path):
+ super().__init__(text=text, path=Path(path))
class DSDLDefinition:
@@ -30,17 +32,17 @@ class DSDLDefinition:
Upper layers that operate on top of this abstraction do not concern themselves with the file system at all.
"""
- def __init__(self, file_path: str, root_namespace_path: str):
+ def __init__(self, file_path: Path, root_namespace_path: Path):
# Normalizing the path and reading the definition text
- self._file_path = os.path.abspath(file_path)
+ self._file_path = Path(file_path).resolve()
del file_path
- self._root_namespace_path = os.path.abspath(root_namespace_path)
+ self._root_namespace_path = Path(root_namespace_path).resolve()
del root_namespace_path
with open(self._file_path) as f:
self._text = str(f.read())
# Checking the sanity of the root directory path - can't contain separators
- if _serializable.CompositeType.NAME_COMPONENT_SEPARATOR in os.path.split(self._root_namespace_path)[-1]:
+ if CompositeType.NAME_COMPONENT_SEPARATOR in os.path.split(self._root_namespace_path)[-1]:
raise FileNameFormatError("Invalid namespace name", path=self._root_namespace_path)
# Determining the relative path within the root namespace directory
@@ -55,7 +57,7 @@ def __init__(self, file_path: str, root_namespace_path: str):
# Parsing the basename, e.g., 434.GetTransportStatistics.0.1.uavcan
basename_components = basename.split(".")[:-1]
- str_fixed_port_id = None # type: typing.Optional[str]
+ str_fixed_port_id: str | None = None
if len(basename_components) == 4:
str_fixed_port_id, short_name, str_major_version, str_minor_version = basename_components
elif len(basename_components) == 3:
@@ -66,10 +68,10 @@ def __init__(self, file_path: str, root_namespace_path: str):
# Parsing the fixed port ID, if specified; None if not
if str_fixed_port_id is not None:
try:
- self._fixed_port_id = int(str_fixed_port_id) # type: typing.Optional[int]
+ self._fixed_port_id: int | None = int(str_fixed_port_id)
except ValueError:
raise FileNameFormatError(
- "Not a valid fixed port-ID: %r. "
+ "Not a valid fixed port-ID: %s. "
"Namespaces are defined as directories; putting the namespace name in the file name will not work. "
'For example: "foo/Bar.1.0.uavcan" is OK (where "foo" is a directory); "foo.Bar.1.0.uavcan" is not.'
% str_fixed_port_id,
@@ -80,28 +82,26 @@ def __init__(self, file_path: str, root_namespace_path: str):
# Parsing the version numbers
try:
- self._version = _serializable.Version(major=int(str_major_version), minor=int(str_minor_version))
+ self._version = Version(major=int(str_major_version), minor=int(str_minor_version))
except ValueError:
raise FileNameFormatError("Could not parse the version numbers", path=self._file_path) from None
# Finally, constructing the name
namespace_components = list(relative_directory.strip(os.sep).split(os.sep))
for nc in namespace_components:
- if _serializable.CompositeType.NAME_COMPONENT_SEPARATOR in nc:
- raise FileNameFormatError("Invalid name for namespace component", path=self._file_path)
+ if CompositeType.NAME_COMPONENT_SEPARATOR in nc:
+ raise FileNameFormatError(f"Invalid name for namespace component: {nc!r}", path=self._file_path)
- self._name = _serializable.CompositeType.NAME_COMPONENT_SEPARATOR.join(
- namespace_components + [str(short_name)]
- ) # type: str
+ self._name: str = CompositeType.NAME_COMPONENT_SEPARATOR.join(namespace_components + [str(short_name)])
- self._cached_type = None # type: typing.Optional[_serializable.CompositeType]
+ self._cached_type: CompositeType | None = None
def read(
self,
- lookup_definitions: typing.Iterable["DSDLDefinition"],
- print_output_handler: typing.Callable[[int, str], None],
+ lookup_definitions: Iterable["DSDLDefinition"],
+ print_output_handler: Callable[[int, str], None],
allow_unregulated_fixed_port_id: bool,
- ) -> _serializable.CompositeType:
+ ) -> CompositeType:
"""
Reads the data type definition and returns its high-level data type representation.
The output is cached; all following invocations will read from the cache.
@@ -153,13 +153,13 @@ def read(
self._cached_type.fixed_port_id,
)
return self._cached_type
- except _error.FrontendError as ex: # pragma: no cover
+ except FrontendError as ex: # pragma: no cover
ex.set_error_location_if_unknown(path=self.file_path)
raise ex
except (MemoryError, SystemError): # pragma: no cover
raise
except Exception as ex: # pragma: no cover
- raise _error.InternalError(culprit=ex, path=self.file_path)
+ raise InternalError(culprit=ex, path=self.file_path) from ex
@property
def full_name(self) -> str:
@@ -167,9 +167,9 @@ def full_name(self) -> str:
return self._name
@property
- def name_components(self) -> typing.List[str]:
+ def name_components(self) -> list[str]:
"""Components of the full name as a list, e.g., ['uavcan', 'node', 'Heartbeat']"""
- return self._name.split(_serializable.CompositeType.NAME_COMPONENT_SEPARATOR)
+ return self._name.split(CompositeType.NAME_COMPONENT_SEPARATOR)
@property
def short_name(self) -> str:
@@ -179,7 +179,7 @@ def short_name(self) -> str:
@property
def full_namespace(self) -> str:
"""The full name without the short name, e.g., uavcan.node for uavcan.node.Heartbeat"""
- return str(_serializable.CompositeType.NAME_COMPONENT_SEPARATOR.join(self.name_components[:-1]))
+ return str(CompositeType.NAME_COMPONENT_SEPARATOR.join(self.name_components[:-1]))
@property
def root_namespace(self) -> str:
@@ -192,11 +192,11 @@ def text(self) -> str:
return self._text
@property
- def version(self) -> _serializable.Version:
+ def version(self) -> Version:
return self._version
@property
- def fixed_port_id(self) -> typing.Optional[int]:
+ def fixed_port_id(self) -> int | None:
"""Either the fixed port ID as integer, or None if not defined for this type."""
return self._fixed_port_id
@@ -205,11 +205,11 @@ def has_fixed_port_id(self) -> bool:
return self.fixed_port_id is not None
@property
- def file_path(self) -> str:
+ def file_path(self) -> Path:
return self._file_path
@property
- def root_namespace_path(self) -> str:
+ def root_namespace_path(self) -> Path:
return self._root_namespace_path
def __eq__(self, other: object) -> bool:
@@ -222,7 +222,7 @@ def __eq__(self, other: object) -> bool:
return NotImplemented # pragma: no cover
def __str__(self) -> str:
- return "DSDLDefinition(full_name=%r, version=%r, fixed_port_id=%r, file_path=%r)" % (
+ return "DSDLDefinition(full_name=%r, version=%r, fixed_port_id=%r, file_path=%s)" % (
self.full_name,
self.version,
self.fixed_port_id,
diff --git a/pydsdl/_error.py b/pydsdl/_error.py
index d7098dd..d331a2d 100644
--- a/pydsdl/_error.py
+++ b/pydsdl/_error.py
@@ -5,6 +5,7 @@
# pylint: disable=broad-except
import typing
+from pathlib import Path
import urllib.parse
@@ -15,13 +16,13 @@ class FrontendError(Exception): # PEP8 says that the "Exception" suffix is redu
please refer to the direct descendants instead.
"""
- def __init__(self, text: str, path: typing.Optional[str] = None, line: typing.Optional[int] = None):
+ def __init__(self, text: str, path: typing.Optional[Path] = None, line: typing.Optional[int] = None):
Exception.__init__(self, text)
self._path = path
self._line = line
def set_error_location_if_unknown(
- self, path: typing.Optional[str] = None, line: typing.Optional[int] = None
+ self, path: typing.Optional[Path] = None, line: typing.Optional[int] = None
) -> None:
"""
Entries that are already known will be left unchanged.
@@ -35,7 +36,7 @@ def set_error_location_if_unknown(
self._line = line
@property
- def path(self) -> typing.Optional[str]:
+ def path(self) -> typing.Optional[Path]:
"""Source file path where the error has occurred, if known."""
return self._path
@@ -59,10 +60,10 @@ def __str__(self) -> str:
uavcan/internet/udp/500.HandleIncomingPacket.1.0.uavcan:33: Error such and such
"""
if self.path and self.line:
- return "%s:%d: %s" % (self.path, self.line, self.text)
+ return "%s:%d: %s" % (self.path.as_posix(), self.line, self.text)
if self.path:
- return "%s: %s" % (self.path, self.text)
+ return "%s: %s" % (self.path.as_posix(), self.text)
return self.text
@@ -79,7 +80,7 @@ class InternalError(FrontendError):
def __init__(
self,
text: typing.Optional[str] = None,
- path: typing.Optional[str] = None,
+ path: typing.Optional[Path] = None,
line: typing.Optional[int] = None,
culprit: typing.Optional[Exception] = None,
):
@@ -114,13 +115,13 @@ def _unittest_error() -> None:
assert repr(ex) == "FrontendError: 'Hello world!'"
try:
- raise FrontendError("Hello world!", path="path/to/file.uavcan", line=123)
+ raise FrontendError("Hello world!", path=Path("path/to/file.uavcan"), line=123)
except Exception as ex:
assert str(ex) == "path/to/file.uavcan:123: Hello world!"
assert repr(ex) == "FrontendError: 'path/to/file.uavcan:123: Hello world!'"
try:
- raise FrontendError("Hello world!", path="path/to/file.uavcan")
+ raise FrontendError("Hello world!", path=Path("path/to/file.uavcan"))
except Exception as ex:
assert str(ex) == "path/to/file.uavcan: Hello world!"
assert repr(ex) == "FrontendError: 'path/to/file.uavcan: Hello world!'"
@@ -128,9 +129,9 @@ def _unittest_error() -> None:
def _unittest_internal_error_github_reporting() -> None:
try:
- raise InternalError(path="FILE_PATH", line=42)
+ raise InternalError(path=Path("FILE_PATH"), line=42)
except FrontendError as ex:
- assert ex.path == "FILE_PATH"
+ assert ex.path == Path("FILE_PATH")
assert ex.line == 42
assert str(ex) == "FILE_PATH:42: "
@@ -139,14 +140,14 @@ def _unittest_internal_error_github_reporting() -> None:
try: # TRY HARDER
raise InternalError(text="BASE TEXT", culprit=Exception("ERROR TEXT"))
except FrontendError as ex:
- ex.set_error_location_if_unknown(path="FILE_PATH")
+ ex.set_error_location_if_unknown(path=Path("FILE_PATH"))
raise
except FrontendError as ex:
ex.set_error_location_if_unknown(line=42)
raise
except FrontendError as ex:
print(ex)
- assert ex.path == "FILE_PATH"
+ assert ex.path == Path("FILE_PATH")
assert ex.line == 42
# We have to ignore the last couple of characters because Python before 3.7 reprs Exceptions like this:
# Exception('ERROR TEXT',)
@@ -158,6 +159,6 @@ def _unittest_internal_error_github_reporting() -> None:
)
try:
- raise InternalError(text="BASE TEXT", path="FILE_PATH")
+ raise InternalError(text="BASE TEXT", path=Path("FILE_PATH"))
except FrontendError as ex:
assert str(ex) == "FILE_PATH: BASE TEXT"
diff --git a/pydsdl/_namespace.py b/pydsdl/_namespace.py
index de58077..9c4eda6 100644
--- a/pydsdl/_namespace.py
+++ b/pydsdl/_namespace.py
@@ -4,11 +4,13 @@
# pylint: disable=logging-not-lazy
+from __future__ import annotations
import os
-import typing
+from typing import Iterable, Callable, DefaultDict
import logging
import fnmatch
import collections
+from pathlib import Path
from . import _serializable
from . import _dsdl_definition
from . import _error
@@ -74,16 +76,16 @@ class SealingConsistencyError(_error.InvalidDefinitionError):
"""
-PrintOutputHandler = typing.Callable[[str, int, str], None]
+PrintOutputHandler = Callable[[Path, int, str], None]
"""Invoked when the frontend encounters a print directive or needs to output a generic diagnostic."""
def read_namespace(
- root_namespace_directory: str,
- lookup_directories: typing.Optional[typing.Union[str, typing.Iterable[str]]] = None,
- print_output_handler: typing.Optional[PrintOutputHandler] = None,
+ root_namespace_directory: Path | str,
+ lookup_directories: None | Path | str | Iterable[Path | str] = None,
+ print_output_handler: PrintOutputHandler | None = None,
allow_unregulated_fixed_port_id: bool = False,
-) -> typing.List[_serializable.CompositeType]:
+) -> list[_serializable.CompositeType]:
"""
This function is the main entry point of the library.
It reads all DSDL definitions from the specified root namespace directory and produces the annotated AST.
@@ -120,24 +122,24 @@ def read_namespace(
# Add the own root namespace to the set of lookup directories, sort lexicographically, remove duplicates.
# We'd like this to be an iterable list of strings but we handle the common practice of passing in a single path.
if lookup_directories is None:
- lookup_directories_path_list = [] # type: typing.List[str]
- elif isinstance(lookup_directories, (str, bytes)):
- lookup_directories_path_list = [lookup_directories]
+ lookup_directories_path_list: list[Path] = []
+ elif isinstance(lookup_directories, (str, bytes, Path)):
+ lookup_directories_path_list = [Path(lookup_directories)]
else:
- lookup_directories_path_list = list(lookup_directories)
+ lookup_directories_path_list = list(map(Path, lookup_directories))
for a in lookup_directories_path_list:
- if not isinstance(a, str): # non-string paths
- raise TypeError("Lookup directories shall be an iterable of strings. Found in list: " + type(a).__name__)
- _logger.debug(_LOG_LIST_ITEM_PREFIX + a)
+ if not isinstance(a, (str, Path)):
+ raise TypeError("Lookup directories shall be an iterable of paths. Found in list: " + type(a).__name__)
+ _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a))
- # Normalize paths and remove duplicates.
- root_namespace_directory = os.path.abspath(root_namespace_directory)
+ # Normalize paths and remove duplicates. Resolve symlinks to avoid ambiguities.
+ root_namespace_directory = Path(root_namespace_directory).resolve()
lookup_directories_path_list.append(root_namespace_directory)
- lookup_directories_path_list = list(sorted({os.path.abspath(x) for x in lookup_directories_path_list}))
+ lookup_directories_path_list = list(sorted({x.resolve() for x in lookup_directories_path_list}))
_logger.debug("Lookup directories are listed below:")
for a in lookup_directories_path_list:
- _logger.debug(_LOG_LIST_ITEM_PREFIX + a)
+ _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a))
# Check for common usage errors and warn the user if anything looks suspicious.
_ensure_no_common_usage_errors(root_namespace_directory, lookup_directories_path_list, _logger.warning)
@@ -149,13 +151,13 @@ def read_namespace(
# Construct DSDL definitions from the target and the lookup dirs.
target_dsdl_definitions = _construct_dsdl_definitions_from_namespace(root_namespace_directory)
if not target_dsdl_definitions:
- _logger.info("The namespace at %r is empty", root_namespace_directory)
+ _logger.info("The namespace at %s is empty", root_namespace_directory)
return []
_logger.debug("Target DSDL definitions are listed below:")
for x in target_dsdl_definitions:
_logger.debug(_LOG_LIST_ITEM_PREFIX + str(x))
- lookup_dsdl_definitions = [] # type: typing.List[_dsdl_definition.DSDLDefinition]
+ lookup_dsdl_definitions = [] # type: list[_dsdl_definition.DSDLDefinition]
for ld in lookup_directories_path_list:
lookup_dsdl_definitions += _construct_dsdl_definitions_from_namespace(ld)
@@ -167,7 +169,7 @@ def read_namespace(
_logger.debug(_LOG_LIST_ITEM_PREFIX + str(x))
_logger.info(
- "Reading %d definitions from the root namespace %r, "
+ "Reading %d definitions from the root namespace %s, "
"with %d lookup definitions located in root namespaces: %s",
len(target_dsdl_definitions),
list(set(map(lambda t: t.root_namespace, target_dsdl_definitions)))[0],
@@ -199,11 +201,11 @@ def read_namespace(
def _read_namespace_definitions(
- target_definitions: typing.List[_dsdl_definition.DSDLDefinition],
- lookup_definitions: typing.List[_dsdl_definition.DSDLDefinition],
- print_output_handler: typing.Optional[PrintOutputHandler] = None,
+ target_definitions: list[_dsdl_definition.DSDLDefinition],
+ lookup_definitions: list[_dsdl_definition.DSDLDefinition],
+ print_output_handler: PrintOutputHandler | None = None,
allow_unregulated_fixed_port_id: bool = False,
-) -> typing.List[_serializable.CompositeType]:
+) -> list[_serializable.CompositeType]:
"""
Construct type descriptors from the specified target definitions.
Allow the target definitions to use the lookup definitions within themselves.
@@ -212,7 +214,7 @@ def _read_namespace_definitions(
:return: A list of types.
"""
- def make_print_handler(definition: _dsdl_definition.DSDLDefinition) -> typing.Callable[[int, str], None]:
+ def make_print_handler(definition: _dsdl_definition.DSDLDefinition) -> Callable[[int, str], None]:
def handler(line_number: int, text: str) -> None:
if print_output_handler: # pragma: no branch
assert isinstance(line_number, int) and isinstance(text, str)
@@ -221,7 +223,7 @@ def handler(line_number: int, text: str) -> None:
return handler
- types = [] # type: typing.List[_serializable.CompositeType]
+ types = [] # type: list[_serializable.CompositeType]
for tdd in target_definitions:
try:
dt = tdd.read(lookup_definitions, make_print_handler(tdd), allow_unregulated_fixed_port_id)
@@ -239,30 +241,30 @@ def handler(line_number: int, text: str) -> None:
def _ensure_no_name_collisions(
- target_definitions: typing.List[_dsdl_definition.DSDLDefinition],
- lookup_definitions: typing.List[_dsdl_definition.DSDLDefinition],
+ target_definitions: list[_dsdl_definition.DSDLDefinition],
+ lookup_definitions: list[_dsdl_definition.DSDLDefinition],
) -> None:
for tg in target_definitions:
for lu in lookup_definitions:
if tg.full_name != lu.full_name and tg.full_name.lower() == lu.full_name.lower():
raise DataTypeNameCollisionError(
- "Full name of this definition differs from %r only by letter case, "
+ "Full name of this definition differs from %s only by letter case, "
"which is not permitted" % lu.file_path,
path=tg.file_path,
)
if tg.full_namespace.lower().startswith(lu.full_name.lower()): # pragma: no cover
raise DataTypeNameCollisionError(
- "The namespace of this type conflicts with %r" % lu.file_path, path=tg.file_path
+ "The namespace of this type conflicts with %s" % lu.file_path, path=tg.file_path
)
if lu.full_namespace.lower().startswith(tg.full_name.lower()):
raise DataTypeNameCollisionError(
- "This type conflicts with the namespace of %r" % lu.file_path, path=tg.file_path
+ "This type conflicts with the namespace of %s" % lu.file_path, path=tg.file_path
)
-def _ensure_no_fixed_port_id_collisions(types: typing.List[_serializable.CompositeType]) -> None:
+def _ensure_no_fixed_port_id_collisions(types: list[_serializable.CompositeType]) -> None:
for a in types:
for b in types:
different_names = a.full_name != b.full_name
@@ -278,20 +280,18 @@ def _ensure_no_fixed_port_id_collisions(types: typing.List[_serializable.Composi
if a.has_fixed_port_id and b.has_fixed_port_id:
if a.fixed_port_id == b.fixed_port_id:
raise FixedPortIDCollisionError(
- "The fixed port ID of this definition is also used in %r" % b.source_file_path,
+ "The fixed port ID of this definition is also used in %s" % b.source_file_path,
path=a.source_file_path,
)
-def _ensure_minor_version_compatibility(types: typing.List[_serializable.CompositeType]) -> None:
- by_name = collections.defaultdict(list) # type: typing.DefaultDict[str, typing.List[_serializable.CompositeType]]
+def _ensure_minor_version_compatibility(types: list[_serializable.CompositeType]) -> None:
+ by_name = collections.defaultdict(list) # type: DefaultDict[str, list[_serializable.CompositeType]]
for t in types:
by_name[t.full_name].append(t)
for definitions in by_name.values():
- by_major = collections.defaultdict(
- list
- ) # type: typing.DefaultDict[int, typing.List[_serializable.CompositeType]]
+ by_major = collections.defaultdict(list) # type: DefaultDict[int, list[_serializable.CompositeType]]
for t in definitions:
by_major[t.version.major].append(t)
@@ -313,20 +313,20 @@ def _ensure_minor_version_compatibility_pairwise(
# Version collision
if a.version.minor == b.version.minor:
raise MultipleDefinitionsUnderSameVersionError(
- "This definition shares its version number with %r" % b.source_file_path, path=a.source_file_path
+ "This definition shares its version number with %s" % b.source_file_path, path=a.source_file_path
)
# Must be of the same kind: both messages or both services
if isinstance(a, _serializable.ServiceType) != isinstance(b, _serializable.ServiceType):
raise VersionsOfDifferentKindError(
- "This definition is not of the same kind as %r" % b.source_file_path, path=a.source_file_path
+ "This definition is not of the same kind as %s" % b.source_file_path, path=a.source_file_path
)
# Must use either the same RPID, or the older one should not have an RPID
if a.has_fixed_port_id == b.has_fixed_port_id:
if a.fixed_port_id != b.fixed_port_id:
raise MinorVersionFixedPortIDError(
- "Different fixed port ID values under the same version %r" % b.source_file_path, path=a.source_file_path
+ "Different fixed port ID values under the same version %s" % b.source_file_path, path=a.source_file_path
)
else:
must_have = a if a.version.minor > b.version.minor else b
@@ -365,15 +365,15 @@ def _ensure_minor_version_compatibility_pairwise(
def _ensure_no_common_usage_errors(
- root_namespace_directory: str, lookup_directories: typing.Iterable[str], reporter: typing.Callable[[str], None]
+ root_namespace_directory: Path, lookup_directories: Iterable[Path], reporter: Callable[[str], None]
) -> None:
suspicious_base_names = [
"public_regulated_data_types",
"dsdl",
]
- def base(s: str) -> str:
- return os.path.basename(os.path.normpath(s))
+ def base(s: Path) -> str:
+ return str(os.path.basename(os.path.normpath(s)))
def is_valid_name(s: str) -> bool:
try:
@@ -385,14 +385,14 @@ def is_valid_name(s: str) -> bool:
all_paths = set([root_namespace_directory] + list(lookup_directories))
for p in all_paths:
- p = os.path.normcase(os.path.abspath(p))
+ p = Path(os.path.normcase(p.resolve()))
try:
- candidates = [x for x in os.listdir(p) if os.path.isdir(os.path.join(p, x)) and is_valid_name(x)]
+ candidates = [x for x in os.listdir(p) if os.path.isdir(os.path.join(p, x)) and is_valid_name(str(x))]
except OSError: # pragma: no cover
candidates = []
if candidates and base(p) in suspicious_base_names:
report = (
- "Possibly incorrect usage detected: input path %r is likely incorrect because the last path component "
+ "Possibly incorrect usage detected: input path %s is likely incorrect because the last path component "
"should be the root namespace name rather than its parent directory. You probably meant:\n%s"
) % (
p,
@@ -401,30 +401,30 @@ def is_valid_name(s: str) -> bool:
reporter(report)
-def _ensure_no_nested_root_namespaces(directories: typing.Iterable[str]) -> None:
- directories = list(sorted([str(os.path.join(os.path.abspath(x), "")) for x in set(directories)]))
- for a in directories:
- for b in directories:
+def _ensure_no_nested_root_namespaces(directories: Iterable[Path]) -> None:
+ dir_str = list(sorted([os.path.join(os.path.abspath(x), "") for x in set(directories)]))
+ for a in dir_str:
+ for b in dir_str:
if (a != b) and a.startswith(b):
raise NestedRootNamespaceError(
- "The following namespace is nested inside this one, which is not permitted: %r" % a, path=b
+ "The following namespace is nested inside this one, which is not permitted: %s" % a, path=Path(b)
)
-def _ensure_no_namespace_name_collisions(directories: typing.Iterable[str]) -> None:
- def get_namespace_name(d: str) -> str:
- return os.path.split(d)[-1]
+def _ensure_no_namespace_name_collisions(directories: Iterable[Path]) -> None:
+ def get_namespace_name(d: Path) -> str:
+ return str(os.path.split(d))[-1]
- directories = list(sorted([str(os.path.abspath(x)) for x in set(directories)]))
+ directories = list(sorted([x.resolve() for x in set(directories)]))
for a in directories:
for b in directories:
if (a != b) and get_namespace_name(a).lower() == get_namespace_name(b).lower():
- raise RootNamespaceNameCollisionError("The name of this namespace conflicts with %r" % b, path=a)
+ raise RootNamespaceNameCollisionError("The name of this namespace conflicts with %s" % b, path=a)
def _construct_dsdl_definitions_from_namespace(
- root_namespace_path: str,
-) -> typing.List[_dsdl_definition.DSDLDefinition]:
+ root_namespace_path: Path,
+) -> list[_dsdl_definition.DSDLDefinition]:
"""
Accepts a directory path, returns a sorted list of abstract DSDL file representations. Those can be read later.
The definitions are sorted by name lexicographically, then by major version (greatest version first),
@@ -436,17 +436,13 @@ def on_walk_error(os_ex: Exception) -> None:
walker = os.walk(root_namespace_path, onerror=on_walk_error, followlinks=True)
- source_file_paths = [] # type: typing.List[str]
+ source_file_paths: set[Path] = set()
for root, _dirnames, filenames in walker:
for filename in fnmatch.filter(filenames, _DSDL_FILE_GLOB):
- source_file_paths.append(os.path.join(root, filename))
-
- _logger.debug("DSDL files in the namespace dir %r are listed below:", root_namespace_path)
- for a in source_file_paths:
- _logger.debug(_LOG_LIST_ITEM_PREFIX + a)
+ source_file_paths.add(Path(root, filename).resolve())
- output = [] # type: typing.List[_dsdl_definition.DSDLDefinition]
- for fp in source_file_paths:
+ output = [] # type: list[_dsdl_definition.DSDLDefinition]
+ for fp in sorted(source_file_paths):
dsdl_def = _dsdl_definition.DSDLDefinition(fp, root_namespace_path)
output.append(dsdl_def)
@@ -458,11 +454,9 @@ def _unittest_dsdl_definition_constructor() -> None:
import tempfile
from ._dsdl_definition import FileNameFormatError
- directory = tempfile.TemporaryDirectory()
- root_ns_dir = os.path.join(directory.name, "foo")
-
- os.mkdir(root_ns_dir)
- os.mkdir(os.path.join(root_ns_dir, "nested"))
+ directory = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
+ root_ns_dir = Path(directory.name, "foo").resolve()
+ (root_ns_dir / "nested").mkdir(parents=True)
def touchy(relative_path: str) -> None:
p = os.path.join(root_ns_dir, relative_path.replace("/", os.path.sep))
@@ -479,24 +473,24 @@ def discard(relative_path: str) -> None:
dsdl_defs = _construct_dsdl_definitions_from_namespace(root_ns_dir)
print(dsdl_defs)
- lut = {x.full_name: x for x in dsdl_defs} # type: typing.Dict[str, _dsdl_definition.DSDLDefinition]
+ lut = {x.full_name: x for x in dsdl_defs} # type: dict[str, _dsdl_definition.DSDLDefinition]
assert len(lut) == 3
assert str(lut["foo.Qwerty"]) == repr(lut["foo.Qwerty"])
assert (
str(lut["foo.Qwerty"])
== "DSDLDefinition(full_name='foo.Qwerty', version=Version(major=123, minor=234), fixed_port_id=123, "
- "file_path=%r)" % lut["foo.Qwerty"].file_path
+ "file_path=%s)" % lut["foo.Qwerty"].file_path
)
assert (
str(lut["foo.nested.Foo"])
== "DSDLDefinition(full_name='foo.nested.Foo', version=Version(major=32, minor=43), fixed_port_id=None, "
- "file_path=%r)" % lut["foo.nested.Foo"].file_path
+ "file_path=%s)" % lut["foo.nested.Foo"].file_path
)
t = lut["foo.Qwerty"]
- assert t.file_path == os.path.join(root_ns_dir, "123.Qwerty.123.234.uavcan")
+ assert t.file_path == root_ns_dir / "123.Qwerty.123.234.uavcan"
assert t.has_fixed_port_id
assert t.fixed_port_id == 123
assert t.text == "# TEST TEXT"
@@ -508,7 +502,7 @@ def discard(relative_path: str) -> None:
assert t.full_namespace == "foo"
t = lut["foo.nested.Asd"]
- assert t.file_path == os.path.join(root_ns_dir, "nested", "2.Asd.21.32.uavcan")
+ assert t.file_path == root_ns_dir / "nested" / "2.Asd.21.32.uavcan"
assert t.has_fixed_port_id
assert t.fixed_port_id == 2
assert t.text == "# TEST TEXT"
@@ -520,7 +514,7 @@ def discard(relative_path: str) -> None:
assert t.full_namespace == "foo.nested"
t = lut["foo.nested.Foo"]
- assert t.file_path == os.path.join(root_ns_dir, "nested", "Foo.32.43.uavcan")
+ assert t.file_path == root_ns_dir / "nested" / "Foo.32.43.uavcan"
assert not t.has_fixed_port_id
assert t.fixed_port_id is None
assert t.text == "# TEST TEXT"
@@ -569,7 +563,7 @@ def discard(relative_path: str) -> None:
assert False
try:
- _construct_dsdl_definitions_from_namespace(root_ns_dir + "/nested/super.bad")
+ _construct_dsdl_definitions_from_namespace(root_ns_dir / "nested/super.bad")
except FileNameFormatError as ex:
print(ex)
else: # pragma: no cover
@@ -581,30 +575,30 @@ def discard(relative_path: str) -> None:
def _unittest_common_usage_errors() -> None:
import tempfile
- directory = tempfile.TemporaryDirectory()
- root_ns_dir = os.path.join(directory.name, "foo")
+ directory = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
+ root_ns_dir = Path(os.path.join(directory.name, "foo"))
os.mkdir(root_ns_dir)
- reports = [] # type: typing.List[str]
+ reports = [] # type: list[str]
_ensure_no_common_usage_errors(root_ns_dir, [], reports.append)
assert not reports
- _ensure_no_common_usage_errors(root_ns_dir, ["/baz"], reports.append)
+ _ensure_no_common_usage_errors(root_ns_dir, [Path("/baz")], reports.append)
assert not reports
- dir_dsdl = os.path.join(root_ns_dir, "dsdl")
+ dir_dsdl = root_ns_dir / "dsdl"
os.mkdir(dir_dsdl)
- _ensure_no_common_usage_errors(dir_dsdl, ["/baz"], reports.append)
+ _ensure_no_common_usage_errors(dir_dsdl, [Path("/baz")], reports.append)
assert not reports # Because empty.
dir_dsdl_vscode = os.path.join(dir_dsdl, ".vscode")
os.mkdir(dir_dsdl_vscode)
- _ensure_no_common_usage_errors(dir_dsdl, ["/baz"], reports.append)
+ _ensure_no_common_usage_errors(dir_dsdl, [Path("/baz")], reports.append)
assert not reports # Because the name is not valid.
dir_dsdl_uavcan = os.path.join(dir_dsdl, "uavcan")
os.mkdir(dir_dsdl_uavcan)
- _ensure_no_common_usage_errors(dir_dsdl, ["/baz"], reports.append)
+ _ensure_no_common_usage_errors(dir_dsdl, [Path("/baz")], reports.append)
(rep,) = reports
reports.clear()
assert os.path.normcase(dir_dsdl_uavcan) in rep
@@ -614,9 +608,22 @@ def _unittest_nested_roots() -> None:
from pytest import raises
_ensure_no_nested_root_namespaces([])
- _ensure_no_nested_root_namespaces(["a"])
- _ensure_no_nested_root_namespaces(["a/b", "a/c"])
+ _ensure_no_nested_root_namespaces([Path("a")])
+ _ensure_no_nested_root_namespaces([Path("a/b"), Path("a/c")])
with raises(NestedRootNamespaceError):
- _ensure_no_nested_root_namespaces(["a/b", "a"])
- _ensure_no_nested_root_namespaces(["aa/b", "a"])
- _ensure_no_nested_root_namespaces(["a/b", "aa"])
+ _ensure_no_nested_root_namespaces([Path("a/b"), Path("a")])
+ _ensure_no_nested_root_namespaces([Path("aa/b"), Path("a")])
+ _ensure_no_nested_root_namespaces([Path("a/b"), Path("aa")])
+
+
+def _unittest_issue_71() -> None: # https://github.com/UAVCAN/pydsdl/issues/71
+ import tempfile
+
+ with tempfile.TemporaryDirectory() as directory:
+ real = Path(directory, "real", "nested")
+ real.mkdir(parents=True)
+ link = Path(directory, "link")
+ link.symlink_to(real, target_is_directory=True)
+ (real / "Msg.0.1.uavcan").write_text("@sealed")
+ assert len(read_namespace(real, [real, link])) == 1
+ assert len(read_namespace(link, [real, link])) == 1
diff --git a/pydsdl/_parser.py b/pydsdl/_parser.py
index 91432b7..c7f776f 100644
--- a/pydsdl/_parser.py
+++ b/pydsdl/_parser.py
@@ -2,6 +2,7 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
+from __future__ import annotations
import os
import typing
import logging
@@ -155,9 +156,9 @@ def _flush_comment(self) -> None:
self._comment_is_header = False
self._comment = ""
- def generic_visit(self, node: _Node, children: typing.Sequence[typing.Any]) -> typing.Any:
+ def generic_visit(self, node: _Node, visited_children: typing.Sequence[typing.Any]) -> typing.Any:
"""If the node has children, replace the node with them."""
- return tuple(children) or node
+ return tuple(visited_children) or node
def visit_line(self, node: _Node, children: _Children) -> None:
if len(node.text) == 0:
@@ -301,8 +302,8 @@ def visit_type_bit_length_suffix(self, node: _Node, _c: _Children) -> int:
visit_op2_mul = parsimonious.NodeVisitor.lift_child
visit_op2_exp = parsimonious.NodeVisitor.lift_child
- def visit_expression_list(self, _n: _Node, children: _Children) -> typing.Tuple[_expression.Any, ...]:
- out = [] # type: typing.List[_expression.Any]
+ def visit_expression_list(self, _n: _Node, children: _Children) -> tuple[_expression.Any, ...]:
+ out = [] # type: list[_expression.Any]
if children:
children = children[0]
assert len(children) == 2
diff --git a/pydsdl/_serializable/_composite.py b/pydsdl/_serializable/_composite.py
index e5d93c1..eff5e8e 100644
--- a/pydsdl/_serializable/_composite.py
+++ b/pydsdl/_serializable/_composite.py
@@ -2,10 +2,12 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
+from __future__ import annotations
import abc
import math
import typing
import itertools
+from pathlib import Path
from .. import _expression
from .. import _port_id_ranges
from .._bit_length_set import BitLengthSet
@@ -61,7 +63,7 @@ def __init__( # pylint: disable=too-many-arguments
attributes: typing.Iterable[Attribute],
deprecated: bool,
fixed_port_id: typing.Optional[int],
- source_file_path: str,
+ source_file_path: Path,
has_parent_service: bool,
doc: str = "",
):
@@ -73,7 +75,7 @@ def __init__( # pylint: disable=too-many-arguments
self._attributes_by_name = {a.name: a for a in self._attributes if not isinstance(a, PaddingField)}
self._deprecated = bool(deprecated)
self._fixed_port_id = None if fixed_port_id is None else int(fixed_port_id)
- self._source_file_path = str(source_file_path)
+ self._source_file_path = Path(source_file_path)
self._has_parent_service = bool(has_parent_service)
self._doc = doc
@@ -236,7 +238,7 @@ def has_fixed_port_id(self) -> bool:
return self.fixed_port_id is not None
@property
- def source_file_path(self) -> str:
+ def source_file_path(self) -> Path:
"""
For synthesized types such as service request/response sections, this property is defined as an empty string.
"""
@@ -348,7 +350,7 @@ def __init__( # pylint: disable=too-many-arguments
attributes: typing.Iterable[Attribute],
deprecated: bool,
fixed_port_id: typing.Optional[int],
- source_file_path: str,
+ source_file_path: Path,
has_parent_service: bool,
doc: str = "",
):
@@ -455,7 +457,7 @@ def __init__( # pylint: disable=too-many-arguments
attributes: typing.Iterable[Attribute],
deprecated: bool,
fixed_port_id: typing.Optional[int],
- source_file_path: str,
+ source_file_path: Path,
has_parent_service: bool,
doc: str = "",
):
@@ -692,7 +694,7 @@ def try_name(name: str) -> CompositeType:
attributes=[],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
@@ -732,7 +734,7 @@ def try_name(name: str) -> CompositeType:
attributes=[],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
@@ -747,7 +749,7 @@ def try_name(name: str) -> CompositeType:
],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
@@ -761,7 +763,7 @@ def try_name(name: str) -> CompositeType:
],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
assert u["a"].name == "a"
@@ -787,7 +789,7 @@ def try_name(name: str) -> CompositeType:
],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
assert s["a"].name == "a"
@@ -846,7 +848,7 @@ def try_union_fields(field_types: typing.List[SerializableType]) -> UnionType:
attributes=atr,
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
@@ -871,27 +873,21 @@ def try_union_fields(field_types: typing.List[SerializableType]) -> UnionType:
assert DelimitedType(u, 800).inner_type is u
assert DelimitedType(u, 800).inner_type.inner_type is u
- assert (
- try_union_fields(
- [
- UnsignedIntegerType(16, PrimitiveType.CastMode.TRUNCATED),
- SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
- ]
- * 257
- ).bit_length_set
- == {16 + 16}
- )
+ assert try_union_fields(
+ [
+ UnsignedIntegerType(16, PrimitiveType.CastMode.TRUNCATED),
+ SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
+ ]
+ * 257
+ ).bit_length_set == {16 + 16}
- assert (
- try_union_fields(
- [
- UnsignedIntegerType(16, PrimitiveType.CastMode.TRUNCATED),
- SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
- ]
- * 32769
- ).bit_length_set
- == {32 + 16}
- )
+ assert try_union_fields(
+ [
+ UnsignedIntegerType(16, PrimitiveType.CastMode.TRUNCATED),
+ SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
+ ]
+ * 32769
+ ).bit_length_set == {32 + 16}
# The reference values for the following test are explained in the array tests above
tu8 = UnsignedIntegerType(8, cast_mode=PrimitiveType.CastMode.TRUNCATED)
@@ -899,15 +895,12 @@ def try_union_fields(field_types: typing.List[SerializableType]) -> UnionType:
outer = FixedLengthArrayType(small, 2) # unpadded bit length values: {4, 12, 20, 28, 36}
# Above plus one bit to each, plus 16-bit for the unsigned integer field
- assert (
- try_union_fields(
- [
- outer,
- SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
- ]
- ).bit_length_set
- == {24, 32, 40, 48, 56}
- )
+ assert try_union_fields(
+ [
+ outer,
+ SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
+ ]
+ ).bit_length_set == {24, 32, 40, 48, 56}
def try_struct_fields(field_types: typing.List[SerializableType]) -> StructureType:
atr = []
@@ -920,7 +913,7 @@ def try_struct_fields(field_types: typing.List[SerializableType]) -> StructureTy
attributes=atr,
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
@@ -941,15 +934,12 @@ def try_struct_fields(field_types: typing.List[SerializableType]) -> StructureTy
assert try_struct_fields([]).bit_length_set == {0} # Empty sets forbidden
- assert (
- try_struct_fields(
- [
- outer,
- SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
- ]
- ).bit_length_set
- == {16 + 16, 24 + 16, 32 + 16, 40 + 16, 48 + 16}
- )
+ assert try_struct_fields(
+ [
+ outer,
+ SignedIntegerType(16, PrimitiveType.CastMode.SATURATED),
+ ]
+ ).bit_length_set == {16 + 16, 24 + 16, 32 + 16, 40 + 16, 48 + 16}
assert try_struct_fields([outer]).bit_length_set == {16, 24, 32, 40, 48}
@@ -971,7 +961,7 @@ def make_type(meta: typing.Type[CompositeType], attributes: typing.Iterable[Attr
attributes=attributes,
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
@@ -1239,7 +1229,7 @@ def validate_iterator(
attributes=[],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=True,
),
response=StructureType(
@@ -1248,7 +1238,7 @@ def validate_iterator(
attributes=[],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=True,
),
fixed_port_id=None,
@@ -1262,7 +1252,7 @@ def validate_iterator(
attributes=[],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=True,
),
response=StructureType(
@@ -1271,7 +1261,7 @@ def validate_iterator(
attributes=[],
deprecated=True,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
),
fixed_port_id=None,
@@ -1284,7 +1274,7 @@ def validate_iterator(
attributes=[],
deprecated=False,
fixed_port_id=None,
- source_file_path="",
+ source_file_path=Path(),
has_parent_service=False,
)
validate_iterator(e, [])
diff --git a/pydsdl/_serializable/_name.py b/pydsdl/_serializable/_name.py
index 9f53086..5198cf9 100644
--- a/pydsdl/_serializable/_name.py
+++ b/pydsdl/_serializable/_name.py
@@ -2,9 +2,10 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
+from __future__ import annotations
import re
import string
-import typing # pylint: disable=W0611
+from typing import Pattern
from ._serializable import TypeParameterError
@@ -42,7 +43,7 @@ def check_name(name: str) -> None:
# Disallowed name patterns apply to any part of any name, e.g., an attribute name, a namespace component,
# type name, etc. The pattern must produce an exact match to trigger a name error. All patterns are case-insensitive.
-_DISALLOWED_NAME_PATTERNS = [
+_DISALLOWED_NAME_PATTERNS: list[str | Pattern[str]] = [
"truncated",
"saturated",
"true",
@@ -72,7 +73,7 @@ def check_name(name: str) -> None:
re.compile(r"com\d$"),
re.compile(r"lpt\d$"),
re.compile(r"_.*_$"),
-] # type: typing.List[typing.Union[str, typing.Pattern[str]]]
+]
def _unittest_check_name() -> None:
diff --git a/pydsdl/_serializable/_primitive.py b/pydsdl/_serializable/_primitive.py
index 95f8501..863a7ea 100644
--- a/pydsdl/_serializable/_primitive.py
+++ b/pydsdl/_serializable/_primitive.py
@@ -174,9 +174,9 @@ def __init__(self, bit_length: int, cast_mode: PrimitiveType.CastMode):
# The limits are exact
self._magnitude = fractions.Fraction(
{
- 16: (2 ** 0x00F) * (2 - frac(2) ** frac(-10)), # IEEE 754 binary16
- 32: (2 ** 0x07F) * (2 - frac(2) ** frac(-23)), # IEEE 754 binary32
- 64: (2 ** 0x3FF) * (2 - frac(2) ** frac(-52)), # IEEE 754 binary64
+ 16: (2**0x00F) * (2 - frac(2) ** frac(-10)), # IEEE 754 binary16
+ 32: (2**0x07F) * (2 - frac(2) ** frac(-23)), # IEEE 754 binary32
+ 64: (2**0x3FF) * (2 - frac(2) ** frac(-52)), # IEEE 754 binary64
}[self.bit_length]
)
except KeyError:
diff --git a/pydsdl/_test.py b/pydsdl/_test.py
index 5cc4942..13c7651 100644
--- a/pydsdl/_test.py
+++ b/pydsdl/_test.py
@@ -2,11 +2,13 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
-# pylint: disable=global-statement,protected-access,too-many-statements
+# pylint: disable=global-statement,protected-access,too-many-statements,consider-using-with
+from __future__ import annotations
import os
import typing
import tempfile
+from pathlib import Path
from textwrap import dedent
from . import _expression
from . import _error
@@ -34,15 +36,15 @@ def _parse_definition(
)
-def _define(rel_path: str, text: str) -> _dsdl_definition.DSDLDefinition:
- rel_path = rel_path.replace("/", os.sep) # Windows compatibility
+def _define(rel_path: str | Path, text: str) -> _dsdl_definition.DSDLDefinition:
+ rel_path = str(rel_path).replace("/", os.sep) # Windows compatibility
assert _DIRECTORY
- path = os.path.join(_DIRECTORY.name, rel_path)
+ path = Path(_DIRECTORY.name, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf8") as f:
f.write(text)
- root_namespace_path = os.path.join(_DIRECTORY.name, rel_path.strip(os.sep).split(os.sep)[0])
+ root_namespace_path = Path(_DIRECTORY.name, rel_path.strip(os.sep).split(os.sep, maxsplit=1)[0])
out = _dsdl_definition.DSDLDefinition(path, root_namespace_path)
print("New definition:", out, "Root NS:", root_namespace_path)
return out
@@ -68,8 +70,8 @@ def _unittest_define() -> None:
assert d.full_name == "uavcan.test.Message"
assert d.version == (1, 2)
assert d.fixed_port_id == 5000
- assert d.file_path == os.path.join(_DIRECTORY.name, "uavcan", "test", "5000.Message.1.2.uavcan")
- assert d.root_namespace_path == os.path.join(_DIRECTORY.name, "uavcan")
+ assert d.file_path == Path(_DIRECTORY.name, "uavcan", "test", "5000.Message.1.2.uavcan").resolve()
+ assert d.root_namespace_path == Path(_DIRECTORY.name, "uavcan").resolve()
assert open(d.file_path).read() == "# empty"
# BUT WHEN I DO, I WRITE UNIT TESTS FOR MY UNIT TESTS
@@ -77,8 +79,8 @@ def _unittest_define() -> None:
assert d.full_name == "uavcan.Service"
assert d.version == (255, 254)
assert d.fixed_port_id is None
- assert d.file_path == os.path.join(_DIRECTORY.name, "uavcan", "Service.255.254.uavcan")
- assert d.root_namespace_path == os.path.join(_DIRECTORY.name, "uavcan")
+ assert d.file_path == Path(_DIRECTORY.name, "uavcan", "Service.255.254.uavcan").resolve()
+ assert d.root_namespace_path == Path(_DIRECTORY.name, "uavcan").resolve()
assert open(d.file_path).read() == "# empty 2"
@@ -105,7 +107,7 @@ def _unittest_simple() -> None:
assert isinstance(p, _serializable.DelimitedType)
assert isinstance(p.inner_type, _serializable.StructureType)
assert p.full_name == "vendor.nested.Abc"
- assert p.source_file_path.endswith(os.path.join("vendor", "nested", "7000.Abc.1.2.uavcan"))
+ assert str(p.source_file_path).endswith(os.path.join("vendor", "nested", "7000.Abc.1.2.uavcan"))
assert p.source_file_path == abc.file_path
assert p.fixed_port_id == 7000
assert p.deprecated
@@ -525,7 +527,7 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False)
with raises(_data_type_builder.UndefinedDataTypeError, match=r"(?i).*nonexistent.TypeName.*1\.0.*"):
standalone("vendor/types/A.1.0.uavcan", "nonexistent.TypeName.1.0 field\n@sealed")
- with raises(_data_type_builder.UndefinedDataTypeError, match=r"(?i).*vendor[/\\]+types' instead of .*vendor'.*"):
+ with raises(_data_type_builder.UndefinedDataTypeError, match=r"(?i).*vendor[/\\]+types instead of .*vendor.*"):
standalone("vendor/types/A.1.0.uavcan", "types.Nonexistent.1.0 field\n@sealed")
with raises(_error.InvalidDefinitionError, match=r"(?i).*not defined for.*"):
@@ -630,7 +632,7 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False)
),
)
except _error.FrontendError as ex:
- assert ex.path and ex.path.endswith(os.path.join("vendor", "types", "A.1.0.uavcan"))
+ assert ex.path and str(ex.path).endswith(os.path.join("vendor", "types", "A.1.0.uavcan"))
assert ex.line and ex.line == 4
else: # pragma: no cover
assert False
@@ -737,7 +739,7 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False)
@_in_n_out
def _unittest_print() -> None:
- printed_items = None # type: typing.Optional[typing.Tuple[int, str]]
+ printed_items = None # type: None | tuple[int, str]
def print_handler(line_number: int, text: str) -> None:
nonlocal printed_items
@@ -980,15 +982,15 @@ def _unittest_parse_namespace() -> None:
directory = tempfile.TemporaryDirectory()
- print_output = None # type: typing.Optional[typing.Tuple[str, int, str]]
+ print_output = None # type: None | tuple[str, int, str]
- def print_handler(d: str, line: int, text: str) -> None:
+ def print_handler(d: Path, line: int, text: str) -> None:
nonlocal print_output
- print_output = d, line, text
+ print_output = str(d), line, text
# noinspection PyShadowingNames
def _define(rel_path: str, text: str) -> None:
- path = os.path.join(directory.name, rel_path)
+ path = Path(directory.name, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(text)
@@ -1041,8 +1043,8 @@ def _define(rel_path: str, text: str) -> None:
_define("zubax/300.Spartans.30.0", "completely unrelated stuff")
parsed = _namespace.read_namespace(
- os.path.join(directory.name, "zubax"),
- [os.path.join(directory.name, "zubax", ".")], # Intentional duplicate
+ Path(directory.name, "zubax"),
+ [Path(directory.name, "zubax", ".")], # Intentional duplicate
print_handler,
)
print(parsed)
@@ -1052,7 +1054,7 @@ def _define(rel_path: str, text: str) -> None:
assert "zubax.nested.Spartans" in [x.full_name for x in parsed]
# try again with minimal arguments to read_namespace
- parsed_minimal_args = _namespace.read_namespace(os.path.join(directory.name, "zubax"))
+ parsed_minimal_args = _namespace.read_namespace(Path(directory.name, "zubax"))
assert len(parsed_minimal_args) == 3
_define(
@@ -1067,14 +1069,14 @@ def _define(rel_path: str, text: str) -> None:
)
with raises(_namespace.FixedPortIDCollisionError):
- _namespace.read_namespace(os.path.join(directory.name, "zubax"), [], print_handler)
+ _namespace.read_namespace(Path(directory.name, "zubax"), [], print_handler)
with raises(TypeError): # Invalid usage: expected path-like object, not bytes.
- _namespace.read_namespace(os.path.join(directory.name, "zubax"), b"/my/path") # type: ignore
+ _namespace.read_namespace(Path(directory.name, "zubax"), b"/my/path") # type: ignore
with raises(TypeError): # Invalid usage: expected path-like object, not bytes.
# noinspection PyTypeChecker
- _namespace.read_namespace(os.path.join(directory.name, "zubax"), [b"/my/path"]) # type: ignore
+ _namespace.read_namespace(Path(directory.name, "zubax"), [b"/my/path"]) # type: ignore
assert print_output is not None
assert "300.Spartans" in print_output[0]
@@ -1093,18 +1095,18 @@ def _define(rel_path: str, text: str) -> None:
)
with raises(_namespace.DataTypeNameCollisionError):
_namespace.read_namespace(
- os.path.join(directory.name, "zubax"),
+ Path(directory.name, "zubax"),
[
- os.path.join(directory.name, "zubax"),
+ Path(directory.name, "zubax"),
],
)
# Do again to test single lookup-directory override
with raises(_namespace.DataTypeNameCollisionError):
- _namespace.read_namespace(os.path.join(directory.name, "zubax"), os.path.join(directory.name, "zubax"))
+ _namespace.read_namespace(Path(directory.name, "zubax"), Path(directory.name, "zubax"))
try:
- os.unlink(os.path.join(directory.name, "zubax/colliding/iceberg/300.Ice.30.0.uavcan"))
+ os.unlink(Path(directory.name, "zubax/colliding/iceberg/300.Ice.30.0.uavcan"))
_define(
"zubax/COLLIDING/300.Iceberg.30.0.uavcan",
dedent(
@@ -1117,9 +1119,9 @@ def _define(rel_path: str, text: str) -> None:
)
with raises(_namespace.DataTypeNameCollisionError, match=".*letter case.*"):
_namespace.read_namespace(
- os.path.join(directory.name, "zubax"),
+ Path(directory.name, "zubax"),
[
- os.path.join(directory.name, "zubax"),
+ Path(directory.name, "zubax"),
],
)
except _namespace.FixedPortIDCollisionError: # pragma: no cover
@@ -1134,14 +1136,13 @@ def _unittest_parse_namespace_versioning() -> None:
# noinspection PyShadowingNames
def _define(rel_path: str, text: str) -> None:
- path = os.path.join(directory.name, rel_path)
+ path = Path(directory.name, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(text)
def _undefine_glob(rel_path_glob: str) -> None:
- path = os.path.join(directory.name, rel_path_glob)
- for g in glob.glob(path):
+ for g in glob.glob(str(Path(directory.name, rel_path_glob))):
os.remove(g)
_define(
@@ -1176,7 +1177,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
),
)
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), [])
print(parsed)
assert len(parsed) == 2
@@ -1195,7 +1196,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
)
with raises(_namespace.VersionsOfDifferentKindError):
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
_undefine_glob("ns/Spartans.30.[01].uavcan")
@@ -1213,7 +1214,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
),
)
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), [])
print(parsed)
assert len(parsed) == 2
@@ -1246,11 +1247,11 @@ def _undefine_glob(rel_path_glob: str) -> None:
)
with raises(_namespace.MultipleDefinitionsUnderSameVersionError):
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
_undefine_glob("ns/Spartans.30.2.uavcan")
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), [])
assert len(parsed) == 3
_undefine_glob("ns/Spartans.30.0.uavcan")
@@ -1269,7 +1270,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
)
with raises(_namespace.MinorVersionFixedPortIDError):
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
_undefine_glob("ns/Spartans.30.1.uavcan")
_define(
@@ -1286,7 +1287,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
),
)
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), [])
assert len(parsed) == 3
_undefine_glob("ns/6700.Spartans.30.1.uavcan")
@@ -1305,7 +1306,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
)
with raises(_namespace.MinorVersionFixedPortIDError):
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
# Adding new major version under the same FPID
_undefine_glob("ns/6701.Spartans.30.1.uavcan")
@@ -1324,7 +1325,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
)
with raises(_namespace.FixedPortIDCollisionError):
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
# Major version zero allows us to re-use the same FPID under a different (non-zero) major version
_undefine_glob("ns/6700.Spartans.31.0.uavcan")
@@ -1349,7 +1350,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
_define("ns/6800.Empty.3.0.uavcan", "@extent 0")
_define("ns/6801.Empty.4.0.uavcan", "@extent 0")
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 8
# Check ordering - the definitions must be sorted properly by name (lexicographically) and version (newest first).
@@ -1367,24 +1368,24 @@ def _undefine_glob(rel_path_glob: str) -> None:
# Extent consistency -- non-service type
_define("ns/Consistency.1.0.uavcan", "uint8 a\n@extent 128")
_define("ns/Consistency.1.1.uavcan", "uint8 a\nuint8 b\n@extent 128")
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 10
_define("ns/Consistency.1.2.uavcan", "uint8 a\nuint8 b\nuint8 c\n@extent 256")
with raises(
_namespace.ExtentConsistencyError, match=r"(?i).*extent of ns\.Consistency\.1\.2 is 256 bits.*"
) as ei_extent:
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
print(ei_extent.value)
- assert ei_extent.value.path and "Consistency.1" in ei_extent.value.path
+ assert ei_extent.value.path and "Consistency.1" in str(ei_extent.value.path)
_undefine_glob("ns/Consistency*")
# Extent consistency -- non-service type, zero major version
_define("ns/Consistency.0.1.uavcan", "uint8 a\n@extent 128")
_define("ns/Consistency.0.2.uavcan", "uint8 a\nuint8 b\n@extent 128")
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 10
_define("ns/Consistency.0.3.uavcan", "uint8 a\nuint8 b\nuint8 c\n@extent 256") # no error
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
_undefine_glob("ns/Consistency*")
# Extent consistency -- request
@@ -1413,7 +1414,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
"""
),
)
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 10
_define(
"ns/Consistency.1.2.uavcan",
@@ -1431,9 +1432,9 @@ def _undefine_glob(rel_path_glob: str) -> None:
with raises(
_namespace.ExtentConsistencyError, match=r"(?i).*extent of ns\.Consistency.* is 256 bits.*"
) as ei_extent:
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
print(ei_extent.value)
- assert ei_extent.value.path and "Consistency.1" in ei_extent.value.path
+ assert ei_extent.value.path and "Consistency.1" in str(ei_extent.value.path)
_undefine_glob("ns/Consistency*")
# Extent consistency -- response
@@ -1462,7 +1463,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
"""
),
)
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 10
_define(
"ns/Consistency.1.2.uavcan",
@@ -1479,21 +1480,21 @@ def _undefine_glob(rel_path_glob: str) -> None:
with raises(
_namespace.ExtentConsistencyError, match=r"(?i).*extent of ns\.Consistency.* is 256 bits.*"
) as ei_extent:
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
print(ei_extent.value)
- assert ei_extent.value.path and "Consistency.1" in ei_extent.value.path
+ assert ei_extent.value.path and "Consistency.1" in str(ei_extent.value.path)
_undefine_glob("ns/Consistency*")
# Sealing consistency -- non-service type
_define("ns/Consistency.1.0.uavcan", "uint64 a\n@extent 64")
_define("ns/Consistency.1.1.uavcan", "uint64 a\n@extent 64")
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 10
_define("ns/Consistency.1.2.uavcan", "uint64 a\n@sealed")
with raises(_namespace.SealingConsistencyError, match=r"(?i).*ns\.Consistency\.1\.2 is sealed.*") as ei_sealing:
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
print(ei_sealing.value)
- assert ei_sealing.value.path and "Consistency.1" in ei_sealing.value.path
+ assert ei_sealing.value.path and "Consistency.1" in str(ei_sealing.value.path)
_undefine_glob("ns/Consistency*")
# Sealing consistency -- request
@@ -1521,7 +1522,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
"""
),
)
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 10
_define(
"ns/Consistency.1.2.uavcan",
@@ -1536,9 +1537,9 @@ def _undefine_glob(rel_path_glob: str) -> None:
),
)
with raises(_namespace.SealingConsistencyError, match=r"(?i).*ns\.Consistency.* is sealed.*") as ei_sealing:
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
print(ei_sealing.value)
- assert ei_sealing.value.path and "Consistency.1" in ei_sealing.value.path
+ assert ei_sealing.value.path and "Consistency.1" in str(ei_sealing.value.path)
_undefine_glob("ns/Consistency*")
# Sealing consistency -- response
@@ -1566,7 +1567,7 @@ def _undefine_glob(rel_path_glob: str) -> None:
"""
),
)
- parsed = _namespace.read_namespace(os.path.join(directory.name, "ns"), []) # no error
+ parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error
assert len(parsed) == 10
_define(
"ns/Consistency.1.2.uavcan",
@@ -1581,9 +1582,9 @@ def _undefine_glob(rel_path_glob: str) -> None:
),
)
with raises(_namespace.SealingConsistencyError, match=r"(?i).*ns\.Consistency.* is sealed.*") as ei_sealing:
- _namespace.read_namespace(os.path.join(directory.name, "ns"), [])
+ _namespace.read_namespace(Path(directory.name, "ns"), [])
print(ei_sealing.value)
- assert ei_sealing.value.path and "Consistency.1" in ei_sealing.value.path
+ assert ei_sealing.value.path and "Consistency.1" in str(ei_sealing.value.path)
_undefine_glob("ns/Consistency*")
diff --git a/setup.cfg b/setup.cfg
index bef7b4a..8033a12 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -116,7 +116,10 @@ disable=
too-many-statements,
useless-super-delegation,
too-many-instance-attributes,
- too-many-public-methods
+ too-many-public-methods,
+ consider-using-f-string,
+ unspecified-encoding,
+ use-implicit-booleaness-not-comparison
[pylint.REPORTS]
output-format=colorized