Skip to content

Commit

Permalink
Caching to the rescue
Browse files Browse the repository at this point in the history
  • Loading branch information
capyvara committed Nov 4, 2022
1 parent 976125d commit ae1200b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 62 deletions.
147 changes: 85 additions & 62 deletions tse/utils/grok.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@
import functools
from typing import Callable, Dict, Iterable, Any, Tuple, Union

from .lru import LRU

# https://github.com/garyelephant/pygrok
class GrokProcessor:
Converter = Callable[[str], Any]

class Matcher:
regex: re.Pattern
type_map: dict[str, str]
can_cache: bool
can_cache_format: bool

def __init__(self, regex: re.Pattern, type_map: dict[str, str], can_cache):
def __init__(self, regex: re.Pattern, type_map: dict[str, str], can_cache_format):
self.regex = regex
self.type_map = type_map
self.can_cache = can_cache
self.can_cache_format = can_cache_format

# https://github.com/garyelephant/pygrok/blob/master/pygrok/patterns/grok-patterns
_base_predefined: dict[str, str] = {
Expand Down Expand Up @@ -55,7 +57,7 @@ def __init__(self, regex: re.Pattern, type_map: dict[str, str], can_cache):
_matchers: list[Matcher]
_matcher_format_cache: dict[(Matcher, bool), str]
_noMatchCache: set
_groupedMatchers: list[Matcher]
_matchCache: LRU[str, Matcher]

_base_type_converters : dict[str, Converter] = {
"int": lambda x: int(x),
Expand All @@ -75,6 +77,8 @@ def __init__(self, predefined: dict[str, str] = {}, type_converters: dict[str, C
def _invalidate_caches(self):
self._matcher_format_cache = {}
self._noMatchCache = set()
self._matchCache = LRU(1024)
GrokProcessor._string_lru_cache.cache_clear()

def add_matchers(self, matchers: Iterable[str], flags: re.RegexFlag = 0) -> GrokProcessor:
self._matchers.extend((self._build_matcher(p, flags) for p in matchers))
Expand All @@ -87,20 +91,20 @@ def load_matchers_from_file(self, file: str, flags: re.RegexFlag = 0) -> GrokPro
if line == '':
continue

can_cache = True
can_cache_format = True

if line.startswith(r"\\"):
line = line[1:]
can_cache = False
can_cache_format = False
else:
line = re.sub(r"%\\{([:\w]+)\\}", r"%{\1}", re.escape(line, literal_spaces=True))

self._matchers.append(self._build_matcher(line, can_cache, flags))
self._matchers.append(self._build_matcher(line, can_cache_format, flags))

self._invalidate_caches()
return self

def _build_matcher(self, pattern: str, can_cache = False, flags: re.RegexFlag = 0) -> Matcher:
def _build_matcher(self, pattern: str, can_cache_format = False, flags: re.RegexFlag = 0) -> Matcher:
iterations = 100

type_map = {}
Expand All @@ -124,80 +128,99 @@ def _build_matcher(self, pattern: str, can_cache = False, flags: re.RegexFlag =
pattern)

if num_subs > 0:
can_cache = False
can_cache_format = False

if re.search('%{\w+(:\w+)?(:\w+)?}', pattern) is None:
return GrokProcessor.Matcher(re.compile(pattern, flags), type_map, can_cache)
return GrokProcessor.Matcher(re.compile(pattern, flags), type_map, can_cache_format)

@functools.lru_cache(maxsize=512, typed=True)
def _string_lru_cache(self, string: str) -> str:
@staticmethod
@functools.lru_cache(maxsize=512)
def _string_lru_cache(string: str) -> str:
return string

def match(self, text: str, *, fullmatch=True, pos_msg_params=False) -> Tuple[str, Union[dict, list]]:
if text in self._noMatchCache:
return (text, None)

for i in range(0, len(self._matchers)):
matcher = self._matchers[i]

def try_matcher(matcher: GrokProcessor.Matcher):
match = matcher.regex.fullmatch(text, concurrent=True) if fullmatch else matcher.regex.match(text, concurrent=True)
if match:
matcherkey = (matcher, pos_msg_params)
params = match.groupdict()
format = None

if matcher.can_cache and matcherkey in self._matcher_format_cache:
format = self._matcher_format_cache[matcherkey]
for key, value in params.items():
if key == "__del__":
del params[key]
continue

try:
type = matcher.type_map[key]
converter = self._type_converters[type]
params[key] = converter(value)
except KeyError:
if len(value) < 32:
params[key] = self._string_lru_cache(value)
else:
split = []
lbound = 0
for key, value, l, r in [(key, value, *match.span(key)) for key, value in params.items()]:
if key == "__del__":
del params[key]
split.append(text[lbound:l])
lbound = r
continue

try:
type = matcher.type_map[key]
converter = self._type_converters[type]
params[key] = converter(value)
except KeyError:
if len(value) < 32:
params[key] = self._string_lru_cache(value)

if not match:
return (False, (text, None))

matcherkey = (matcher, pos_msg_params)
params = match.groupdict()
format = None

if matcher.can_cache_format and matcherkey in self._matcher_format_cache:
format = self._matcher_format_cache[matcherkey]
for key, value in params.items():
if key == "__del__":
del params[key]
continue

try:
type = matcher.type_map[key]
converter = self._type_converters[type]
params[key] = converter(value)
except KeyError:
if len(value) < 32:
params[key] = GrokProcessor._string_lru_cache(value)
else:
split = []
lbound = 0
for key, value, l, r in [(key, value, *match.span(key)) for key, value in params.items()]:
if key == "__del__":
del params[key]
split.append(text[lbound:l])
lbound = r
continue

try:
type = matcher.type_map[key]
converter = self._type_converters[type]
params[key] = converter(value)
except KeyError:
if len(value) < 32:
params[key] = GrokProcessor._string_lru_cache(value)

if pos_msg_params:
split.append("%s")
else:
split.append(f"%({key})s")
split.append(text[lbound:l])
lbound = r

split.append(text[lbound:])
format = "".join(split)
if pos_msg_params:
split.append("%s")
else:
split.append(f"%({key})s")

if matcher.can_cache:
self._matcher_format_cache[matcherkey] = format
split.append(text[lbound:])
format = "".join(split)

if matcher.can_cache_format:
self._matcher_format_cache[matcherkey] = format

return (True, (format, params if not pos_msg_params else list(params.values())))

cachedMatcher = None
try:
cachedMatcher = self._matchCache[text]
res, ret = try_matcher(cachedMatcher)
if res:
return ret
except KeyError:
pass

for i in range(0, len(self._matchers)):
matcher = self._matchers[i]
if matcher == cachedMatcher:
continue

res, ret = try_matcher(self._matchers[i])
if res:
# Optimization: Bubble up the matched one so most common goes first in the list
if i > 0:
self._matchers[i - 1], self._matchers[i] = self._matchers[i], self._matchers[i - 1]

return (format, params if not pos_msg_params else list(params.values()))
self._matchCache[text] = matcher
return ret

self._noMatchCache.add(text)
return (text, None)

19 changes: 19 additions & 0 deletions tse/utils/lru.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from collections import OrderedDict

class LRU(OrderedDict):
def __init__(self, maxsize=128, /, *args, **kwds):
self.maxsize = maxsize
super().__init__(*args, **kwds)

def __getitem__(self, key):
value = super().__getitem__(key)
self.move_to_end(key)
return value

def __setitem__(self, key, value):
if key in self:
self.move_to_end(key)
super().__setitem__(key, value)
if len(self) > self.maxsize:
oldest = next(iter(self))
del self[oldest]

0 comments on commit ae1200b

Please sign in to comment.