Skip to content

Commit

Permalink
Simpler query streaming. (jtv#634)
Browse files Browse the repository at this point in the history
`stream_query`: like `stream_from` but stronger-typed.

It's becoming clear that `stream_from` was too weakly typed.  Why would
anyone want to read different rows of the same query result as different
column types?  Too many corner cases, too much complication, too much
code that really doesn't deliver any value to the user.

The new code is not only simpler, but faster as well.  In C++20 or C++23
I may turn it into a generator, so that the compiler may be able to elide
the heap-stored stack frame, which would turn some data members into
local variables, which in turn may allow more aggressive optimisation.
  • Loading branch information
jtv committed Mar 11, 2023
1 parent f181755 commit 9f6c3a8
Show file tree
Hide file tree
Showing 25 changed files with 899 additions and 161 deletions.
3 changes: 2 additions & 1 deletion NEWS
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
7.8.0
- Streaming large data sets now benchmarks faster than similar C/libpq code!
- New `array` class for easier parsing of SQL arrays.
- Deprecating `stream_from`. Use `transaction_base::stream()`.
- Use `array_parser` only on comma-separated types, i.e. most of them. (#590)
- Bumping requirements versions: need postgres 10.
- Fix `array_parser` bug when parsing semicolon in an unquoted string.
- Make some `zview` constructors `noexcept` if `string_view` does it.
- Handle result status code for starting streaming replication. (#631)
- Faster text decoding in `stream_from`, and escaping in `stream_to`. (#601)
- Faster text decoding and escaping in data streaming. (#601)
- Deprecate `basic_fieldstream` and `fieldstream`.
- Deprecate `<<` operator inserting a field into an `ostream`.
- Ran `autoupdate` (because the autotools told me to).
Expand Down
2 changes: 1 addition & 1 deletion config/Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ am__can_run_installinfo = \
esac
am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
am__DIST_COMMON = $(srcdir)/Makefile.in compile config.guess \
config.sub install-sh ltmain.sh missing mkinstalldirs
config.sub depcomp install-sh ltmain.sh missing mkinstalldirs
DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
ACLOCAL = @ACLOCAL@
AMTAR = @AMTAR@
Expand Down
34 changes: 25 additions & 9 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -17955,17 +17955,11 @@ cat confdefs.h - <<_ACEOF >conftest.$ac_ext

* To try and get close to the situation in the library code itself, we try

* including some standard headers that we don't strictly need here.

*/

* including some standard headers and OS headers that we don't strictly need

* here.

#if __has_include(<ciso646>)

# include <ciso646>

#endif
*/



Expand All @@ -17987,6 +17981,28 @@ cat confdefs.h - <<_ACEOF >conftest.$ac_ext



#if __has_include(<winsock2.h>)

# include <winsock2.h>

#endif

#if __has_include(<ws2tcpip.h>)

# include <ws2tcpip.h>

#endif

#if __has_include(<mstcpip.h>)

# include <mstcpip.h>

#endif





int main()

{
Expand Down
2 changes: 2 additions & 0 deletions include/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ nobase_include_HEADERS= pqxx/pqxx \
pqxx/internal/sql_cursor.hxx \
pqxx/internal/statement_parameters.hxx \
pqxx/internal/stream_iterator.hxx \
pqxx/internal/stream_query.hxx \
pqxx/internal/stream_query_impl.hxx \
pqxx/internal/wait.hxx \
pqxx/internal/gates/connection-errorhandler.hxx \
pqxx/internal/gates/connection-largeobject.hxx \
Expand Down
2 changes: 2 additions & 0 deletions include/Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ nobase_include_HEADERS = pqxx/pqxx \
pqxx/internal/sql_cursor.hxx \
pqxx/internal/statement_parameters.hxx \
pqxx/internal/stream_iterator.hxx \
pqxx/internal/stream_query.hxx \
pqxx/internal/stream_query_impl.hxx \
pqxx/internal/wait.hxx \
pqxx/internal/gates/connection-errorhandler.hxx \
pqxx/internal/gates/connection-largeobject.hxx \
Expand Down
6 changes: 3 additions & 3 deletions include/pqxx/connection.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class connection_largeobject;
class connection_notification_receiver;
class connection_pipeline;
class connection_sql_cursor;
class connection_stream_from;
struct connection_stream_from;
class connection_stream_to;
class connection_transaction;
class const_connection_largeobject;
Expand Down Expand Up @@ -1055,9 +1055,9 @@ private:
void PQXX_PRIVATE register_transaction(transaction_base *);
void PQXX_PRIVATE unregister_transaction(transaction_base *) noexcept;

friend class internal::gate::connection_stream_from;
friend struct internal::gate::connection_stream_from;
std::pair<std::unique_ptr<char, std::function<void(char *)>>, std::size_t>
PQXX_PRIVATE read_copy_line();
read_copy_line();

friend class internal::gate::connection_stream_to;
void PQXX_PRIVATE write_copy_line(std::string_view);
Expand Down
2 changes: 1 addition & 1 deletion include/pqxx/internal/array-composite.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ template<encoding_group ENC>
inline std::size_t scan_double_quoted_string(
char const input[], std::size_t size, std::size_t pos)
{
// XXX: find_char<'"', '\\'>().
// TODO: find_char<'"', '\\'>().
using scanner = glyph_scanner<ENC>;
auto next{scanner::call(input, size, pos)};
bool at_quote{false};
Expand Down
2 changes: 1 addition & 1 deletion include/pqxx/internal/conversions.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ inline std::string state_buffer_overrun(HAVE have_bytes, NEED need_bytes)


/// Throw exception for attempt to convert null to given type.
[[noreturn]] PQXX_LIBEXPORT void
[[noreturn]] PQXX_LIBEXPORT PQXX_COLD void
throw_null_conversion(std::string const &type);


Expand Down
2 changes: 1 addition & 1 deletion include/pqxx/internal/encoding_group.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ enum class encoding_group
};


// XXX: Get rid of these. Specialise at higher level.
// TODO: Get rid of these. Specialise at higher level.
/// Function type: "find the end of the current glyph."
/** This type of function takes a text buffer, and a location in that buffer,
* and returns the location one byte past the end of the current glyph.
Expand Down
63 changes: 62 additions & 1 deletion include/pqxx/internal/encodings.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ template<encoding_group> struct glyph_scanner
};


namespace
{
/// Find any of the ASCII characters in `NEEDLE` in `haystack`.
/** Scans through `haystack` until it finds a single-byte character that
* matches any of the values in `NEEDLE`.
Expand All @@ -154,7 +156,7 @@ template<encoding_group> struct glyph_scanner
* otherwise.
*/
template<encoding_group ENC, char... NEEDLE>
PQXX_PURE static std::size_t
PQXX_PURE inline std::size_t
find_ascii_char(std::string_view haystack, std::size_t here)
{
// We only know how to search for ASCII characters. It's an optimisation
Expand Down Expand Up @@ -193,6 +195,31 @@ find_ascii_char(std::string_view haystack, std::size_t here)
}
return sz;
}
} // namespace


/// Find first of `NEEDLE` ASCII chars in `haystack`.
/** @warning This assumes that one of the `NEEDLE` characters is actually
* present. It does not check for buffer overruns, so make sure that there's
* a sentinel.
*/
template<encoding_group ENC, char... NEEDLE>
PQXX_PURE std::size_t
find_s_ascii_char(std::string_view haystack, std::size_t here)
{
// We only know how to search for ASCII characters. It's an optimisation
// assumption in the code below.
static_assert((... and ((NEEDLE >> 7) == 0)));

auto const sz{std::size(haystack)};
auto const data{std::data(haystack)};

// No supported encoding has multibyte characters that start with an
// ASCII-range byte.
while ((... and (data[here] != NEEDLE)))
here = glyph_scanner<ENC>::call(data, sz, here);
return here;
}


template<> struct glyph_scanner<encoding_group::MONOBYTE>
Expand Down Expand Up @@ -804,5 +831,39 @@ get_char_finder(encoding_group enc)
"Unexpected encoding group: ", as_if, " (mapped from ", enc, ").")};
}
}


/// Look up a "sentry" character search function for an encoding group.
/** This version returns a finder function that does not check buffer bounds.
* It just assumes that one of the `NEEDLE` characters will be there.
*/
template<char... NEEDLE>
PQXX_PURE constexpr inline char_finder_func *
get_s_char_finder(encoding_group enc)
{
auto const as_if{map_ascii_search_group(enc)};
switch (as_if)
{
case encoding_group::MONOBYTE:
return pqxx::internal::find_s_ascii_char<
encoding_group::MONOBYTE, NEEDLE...>;
case encoding_group::BIG5:
return pqxx::internal::find_s_ascii_char<encoding_group::BIG5, NEEDLE...>;
case encoding_group::GB18030:
return pqxx::internal::find_s_ascii_char<encoding_group::GB18030, NEEDLE...>;
case encoding_group::GBK:
return pqxx::internal::find_s_ascii_char<encoding_group::GBK, NEEDLE...>;
case encoding_group::JOHAB:
return pqxx::internal::find_s_ascii_char<encoding_group::JOHAB, NEEDLE...>;
case encoding_group::SJIS:
return pqxx::internal::find_s_ascii_char<encoding_group::SJIS, NEEDLE...>;
case encoding_group::UHC:
return pqxx::internal::find_s_ascii_char<encoding_group::UHC, NEEDLE...>;

default:
throw pqxx::internal_error{concat(
"Unexpected encoding group: ", as_if, " (mapped from ", enc, ").")};
}
}
} // namespace pqxx::internal
#endif
14 changes: 9 additions & 5 deletions include/pqxx/internal/gates/connection-stream_from.hxx
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
#include <pqxx/internal/callgate.hxx>
#if !defined(PQXX_H_CONNECTION_STREAM_FROM)
# define PQXX_H_CONNECTION_STREAM_FROM

#include "pqxx/connection.hxx"
# include <pqxx/internal/callgate.hxx>

# include "pqxx/connection.hxx"

namespace pqxx::internal::gate
{
class PQXX_PRIVATE connection_stream_from : callgate<connection>
// Not publicising this call gate to specific classes. We also use it in
// stream_query, which is a template.
struct PQXX_PRIVATE connection_stream_from : callgate<connection>
{
friend class pqxx::stream_from;

connection_stream_from(reference x) : super{x} {}

auto read_copy_line() { return home().read_copy_line(); }
};
} // namespace pqxx::internal::gate
#endif
50 changes: 18 additions & 32 deletions include/pqxx/internal/stream_iterator.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,34 @@

namespace pqxx
{
class stream_from;
template<typename... TYPE> class stream_query;
}


namespace pqxx::internal
{
// C++20: Replace with generator?
/// Input iterator for stream_from.
/** Just barely enough to support range-based "for" loops. Don't assume that
* any of the usual behaviour works beyond that.
/** Just barely enough to support range-based "for" loops on stream_from.
* Don't assume that any of the usual behaviour works beyond that.
*/
template<typename... TYPE> class stream_input_iterator
template<typename... TYPE> class stream_from_input_iterator
{
using stream_t = stream_from;

public:
using value_type = std::tuple<TYPE...>;

/// Construct an "end" iterator.
stream_input_iterator() = default;
stream_from_input_iterator() = default;

explicit stream_input_iterator(stream_from &home) : m_home(&home)
explicit stream_from_input_iterator(stream_t &home) : m_home(&home)
{
advance();
}
stream_input_iterator(stream_input_iterator const &) = default;
stream_from_input_iterator(stream_from_input_iterator const &) = default;

stream_input_iterator &operator++()
stream_from_input_iterator &operator++()
{
advance();
return *this;
Expand All @@ -47,12 +49,12 @@ public:
value_type const &operator*() const { return m_value; }

/// Comparison only works for comparing to end().
bool operator==(stream_input_iterator const &rhs) const
bool operator==(stream_from_input_iterator const &rhs) const
{
return m_home == rhs.m_home;
}
/// Comparison only works for comparing to end().
bool operator!=(stream_input_iterator const &rhs) const
bool operator!=(stream_from_input_iterator const &rhs) const
{
return not(*this == rhs);
}
Expand All @@ -66,40 +68,24 @@ private:
m_home = nullptr;
}

stream_from *m_home{nullptr};
stream_t *m_home{nullptr};
value_type m_value;
};


// C++20: Replace with generator?
/// Iteration over a @ref stream_from.
/// Iteration over a @ref stream_query.
template<typename... TYPE> class stream_input_iteration
{
public:
using iterator = stream_input_iterator<TYPE...>;
explicit stream_input_iteration(stream_from &home) : m_home{home} {}
using stream_t = stream_from;
using iterator = stream_from_input_iterator<TYPE...>;
explicit stream_input_iteration(stream_t &home) : m_home{home} {}
iterator begin() const { return iterator{m_home}; }
iterator end() const { return {}; }

private:
stream_from &m_home;
};


// C++20: Replace with generator?
/// Iteration over a @ref stream_from, deleting it once done.
template<typename... TYPE> class owning_stream_input_iteration
{
public:
using iterator = stream_input_iterator<TYPE...>;
explicit owning_stream_input_iteration(std::unique_ptr<stream_from> &&home) :
m_home{std::move(home)}
{}
iterator begin() const { return iterator{*m_home.get()}; }
iterator end() const { return {}; }

private:
std::unique_ptr<stream_from> m_home;
stream_t &m_home;
};
} // namespace pqxx::internal
#endif
Loading

0 comments on commit 9f6c3a8

Please sign in to comment.