Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kwargs to function generated tables #4542

Merged
36 changes: 19 additions & 17 deletions py/server/deephaven/table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
""" This module provides various ways to make a Deephaven table. """

import datetime
from typing import Callable, List, Dict, Any, Union, Sequence
from typing import Callable, List, Dict, Any, Union, Sequence, Tuple

import jpy
import numpy as np
Expand Down Expand Up @@ -342,12 +342,13 @@ def ring_table(parent: Table, capacity: int, initialize: bool = True) -> Table:
raise DHError(e, "failed to create a ring table.") from e


def function_generated_table(table_generator: Callable[[], Table],
def function_generated_table(table_generator: Callable[..., Table],
source_tables: Union[Table, List[Table]] = None,
refresh_interval_ms: int = None,
exec_ctx: ExecutionContext = None) -> Table:
"""
Creates an abstract table that is generated by running the table_generator() function. The function will first be
exec_ctx: ExecutionContext = None,
args: Tuple = (),
kwargs: Dict = {}) -> Table:
"""Creates an abstract table that is generated by running the table_generator() function. The function will first be
run to generate the table when this method is called, then subsequently either (a) whenever one of the
'source_tables' ticks or (b) after refresh_interval_ms have elapsed. Either 'refresh_interval_ms' or
'source_tables' must be set (but not both).
Expand All @@ -358,29 +359,30 @@ def function_generated_table(table_generator: Callable[[], Table],

The table definition must not change between invocations of the 'table_generator' function, or an exception will be raised.

Note that the 'table_generator' may access data in the sourceTables but should not perform further table operations
on them without careful handling. Table operations may be memoized, and it is possible that a table operation will
return a table created by a previous invocation of the same operation. Since that result will not have been included
in the 'source_table', it's not automatically treated as a dependency for purposes of determining when it's safe to
invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own update
processing. It's best to include all dependencies directly in 'source_table', or only compute on-demand inputs under
a LivenessScope.
Note that the 'table_generator' may access data in the sourceTables but should not perform further table operations
on them without careful handling. Table operations may be memoized, and it is possible that a table operation will
return a table created by a previous invocation of the same operation. Since that result will not have been included
in the 'source_table', it's not automatically treated as a dependency for purposes of determining when it's safe to
invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own update
processing. It's best to include all dependencies directly in 'source_table', or only compute on-demand inputs under
a LivenessScope.

Args:
table_generator (Callable[[], Table]): The table generator function. This function must return a Table.
table_generator (Callable[..., Table]): The table generator function. This function must return a Table.
source_tables (Union[Table, List[Table]]): Source tables used by the 'table_generator' function. The
'table_generator' is rerun when any of these tables tick.
refresh_interval_ms (int): Interval (in milliseconds) at which the 'table_generator' function is rerun.
exec_ctx (ExecutionContext) -> Table): A custom execution context. If 'None', the current
exec_ctx (ExecutionContext): A custom execution context. If 'None', the current
execution context is used. If there is no current execution context, a ValueError is raised.
args (Tuple): Optional tuple of positional arguments to pass to table_generator. Defaults to ()
kwargs (Dict): Optional dictionary of keyword arguments to pass to table_generator. Defaults to {}

Returns:
a new table

Raises:
DHError
"""

if refresh_interval_ms is None and source_tables is None:
raise DHError("Either refresh_interval_ms or source_tables must be provided!")

Expand All @@ -396,7 +398,7 @@ def function_generated_table(table_generator: Callable[[], Table],

def table_generator_function():
with exec_ctx:
result = table_generator()
result = table_generator(*args, **kwargs)

if result is None:
raise DHError("table_generator did not return a result")
Expand Down
145 changes: 145 additions & 0 deletions py/server/tests/test_function_generated_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,151 @@ def table_generator_function():
self.assertEqual(result_str, 'test string')
self.assertEqual(result_int, 12345)

def test_generated_table_args(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

with update_graph.exclusive_lock(self.test_update_graph):
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
args=(5, "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()"))
self.assertEqual(result_table.size, 5)
first_row_key = get_row_key(0, result_table)
initial_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

if not result_table.await_update(5_000):
raise RuntimeError("Result table did not update within 5 seconds")

first_row_key = get_row_key(0, result_table)
later_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

# Make sure it ticked at least once within 5 seconds. It should have ticked twice,
# but leaving a wider margin to ensure the test passes -- as long as it ticks at all
# we can be confident it's working.
self.assertGreater(later_time, initial_time)

def test_generated_table_kwargs(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

with update_graph.exclusive_lock(self.test_update_graph):
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
kwargs={'nrows': 5, 'query_string': "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()"})
self.assertEqual(result_table.size, 5)
first_row_key = get_row_key(0, result_table)
initial_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

if not result_table.await_update(5_000):
raise RuntimeError("Result table did not update within 5 seconds")

first_row_key = get_row_key(0, result_table)
later_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

# Make sure it ticked at least once within 5 seconds. It should have ticked twice,
# but leaving a wider margin to ensure the test passes -- as long as it ticks at all
# we can be confident it's working.
self.assertGreater(later_time, initial_time)

def test_generated_table_args_and_kwargs(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

with update_graph.exclusive_lock(self.test_update_graph):
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
args=(5,), kwargs={'query_string': "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()"})
self.assertEqual(result_table.size, 5)
first_row_key = get_row_key(0, result_table)
initial_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

if not result_table.await_update(5_000):
raise RuntimeError("Result table did not update within 5 seconds")

first_row_key = get_row_key(0, result_table)
later_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

# Make sure it ticked at least once within 5 seconds. It should have ticked twice,
# but leaving a wider margin to ensure the test passes -- as long as it ticks at all
# we can be confident it's working.
self.assertGreater(later_time, initial_time)

def test_generated_table_wrong_kwargs(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

# nrow is not a valid argument, so we expect a type error
with update_graph.exclusive_lock(self.test_update_graph):
with self.assertRaises(Exception) as cm:
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
kwargs={'nrow': 1, 'query_string': "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()"})
self.assertIn("table_generator_function() got an unexpected keyword argument 'nrow'", str(cm.exception))
alexpeters1208 marked this conversation as resolved.
Show resolved Hide resolved

def test_generated_table_too_few_args(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

# nrow is not a valid argument, so we expect a type error
with update_graph.exclusive_lock(self.test_update_graph):
with self.assertRaises(Exception) as cm:
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
args=(1,))
self.assertIn("table_generator_function() missing 1 required positional argument: 'query_string'", str(cm.exception))

def test_generated_table_too_few_kwargs(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

# nrow is not a valid argument, so we expect a type error
with update_graph.exclusive_lock(self.test_update_graph):
with self.assertRaises(Exception) as cm:
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
kwargs={'query_string': "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()"})
self.assertIn("table_generator_function() missing 1 required positional argument: 'nrows'", str(cm.exception))

def test_generated_table_too_few_args_and_kwargs(self):
def table_generator_function(nrows, arg2, query_string, arg4):
return empty_table(nrows).update(query_string)

# nrow is not a valid argument, so we expect a type error
with update_graph.exclusive_lock(self.test_update_graph):
with self.assertRaises(Exception) as cm:
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
args=(5,),
kwargs={'query_string': "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()"})
self.assertIn("table_generator_function() missing 2 required positional arguments: 'arg2' and 'arg4'",
str(cm.exception))

def test_generated_table_too_many_args(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

# nrow is not a valid argument, so we expect a type error
with update_graph.exclusive_lock(self.test_update_graph):
with self.assertRaises(Exception) as cm:
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
args=(1, "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()", "hello"))
self.assertIn("table_generator_function() takes 2 positional arguments but 3 were given", str(cm.exception))

def test_generated_table_too_many_kwargs(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

# nrow is not a valid argument, so we expect a type error
with update_graph.exclusive_lock(self.test_update_graph):
with self.assertRaises(Exception) as cm:
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
kwargs={'nrows': 5, 'query_string': "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()", 'non_argument': "hello"})
self.assertIn("table_generator_function() got an unexpected keyword argument 'non_argument'", str(cm.exception))
alexpeters1208 marked this conversation as resolved.
Show resolved Hide resolved

def test_generated_table_too_many_args_and_kwargs(self):
def table_generator_function(nrows, query_string):
return empty_table(nrows).update(query_string)

# nrow is not a valid argument, so we expect a type error
with update_graph.exclusive_lock(self.test_update_graph):
with self.assertRaises(Exception) as cm:
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000,
args=(5, "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()", "hello"),
kwargs={'nrows': 5, 'query_string': "Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()", 'non_argument': "hello"})
self.assertIn("table_generator_function() got multiple values for argument 'nrows'", str(cm.exception))

def get_row_key(row_position: int, t: Table) -> Any:
return t.j_table.getRowSet().get(row_position)
Loading