Skip to content

Commit

Permalink
2006-09-27 Alexander Larsson <[email protected]>
Browse files Browse the repository at this point in the history
	* ginputstream.c:
	* goutputstream.c:
	Wrap async callback to automatically handle
	setting of pending, closed and ref/unref of the stream.

Original git commit by alex <alex> at 1159365103 +0000


git-svn-id: svn+ssh://svn.gnome.org/svn/gvfs/trunk@7 0feb0166-6c39-0410-8ede-fcc449a06167
  • Loading branch information
alexl committed Sep 13, 2007
1 parent 768ebcc commit 5ed6bbd
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 38 deletions.
76 changes: 56 additions & 20 deletions ginputstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct _GInputStreamPrivate {
guint cancelled : 1;
GMainContext *context;
gint io_job_id;
gpointer outstanding_callback;
};

static gssize g_input_stream_real_skip (GInputStream *stream,
Expand Down Expand Up @@ -421,6 +422,21 @@ queue_read_async_result (GInputStream *stream,
g_source_unref (source);
}

static void
read_async_callback_wrapper (GInputStream *stream,
void *buffer,
gsize count_requested,
gssize count_read,
gpointer data,
GError *error)
{
GAsyncReadCallback real_callback = stream->priv->outstanding_callback;

stream->priv->pending = FALSE;
(*real_callback) (stream, buffer, count_requested, count_read, data, error);
g_object_unref (stream);
}

/**
* g_input_stream_read_async:
* @stream: A #GInputStream.
Expand Down Expand Up @@ -513,7 +529,9 @@ g_input_stream_read_async (GInputStream *stream,
class = G_INPUT_STREAM_GET_CLASS (stream);

stream->priv->pending = TRUE;
class->read_async (stream, buffer, count, io_priority, callback, data, notify);
stream->priv->outstanding_callback = callback;
g_object_ref (stream);
class->read_async (stream, buffer, count, io_priority, read_async_callback_wrapper, data, notify);
}

typedef struct {
Expand Down Expand Up @@ -586,6 +604,20 @@ queue_skip_async_result (GInputStream *stream,
g_source_unref (source);
}

static void
skip_async_callback_wrapper (GInputStream *stream,
gsize count_requested,
gssize count_skipped,
gpointer data,
GError *error)
{
GAsyncSkipCallback real_callback = stream->priv->outstanding_callback;

stream->priv->pending = FALSE;
(*real_callback) (stream, count_requested, count_skipped, data, error);
g_object_unref (stream);
}

/**
* g_input_stream_skip_async:
* @stream: A #GInputStream.
Expand Down Expand Up @@ -673,7 +705,9 @@ g_input_stream_skip_async (GInputStream *stream,

class = G_INPUT_STREAM_GET_CLASS (stream);
stream->priv->pending = TRUE;
class->skip_async (stream, count, io_priority, callback, data, notify);
stream->priv->outstanding_callback = callback;
g_object_ref (stream);
class->skip_async (stream, count, io_priority, skip_async_callback_wrapper, data, notify);
}


Expand Down Expand Up @@ -743,6 +777,20 @@ queue_close_async_result (GInputStream *stream,
g_source_unref (source);
}

static void
close_async_callback_wrapper (GInputStream *stream,
gboolean result,
gpointer data,
GError *error)
{
GAsyncCloseInputCallback real_callback = stream->priv->outstanding_callback;

stream->priv->pending = FALSE;
stream->priv->closed = TRUE;
(*real_callback) (stream, result, data, error);
g_object_unref (stream);
}

/**
* g_input_stream_close_async:
* @stream: A #GInputStream.
Expand Down Expand Up @@ -794,10 +842,11 @@ g_input_stream_close_async (GInputStream *stream,

class = G_INPUT_STREAM_GET_CLASS (stream);
stream->priv->pending = TRUE;
class->close_async (stream, io_priority, callback, data, notify);
stream->priv->outstanding_callback = callback;
g_object_ref (stream);
class->close_async (stream, io_priority, close_async_callback_wrapper, data, notify);
}


/**
* g_input_stream_cancel:
* @stream: A #GInputStream.
Expand Down Expand Up @@ -840,7 +889,6 @@ g_input_stream_is_cancelled (GInputStream *stream)
return stream->priv->cancelled;
}


/********************************************
* Default implementation of async ops *
********************************************/
Expand All @@ -862,8 +910,6 @@ read_op_report (gpointer data)
{
ReadAsyncOp *op = data;

op->stream->priv->pending = FALSE;

op->callback (op->stream,
op->buffer,
op->count_requested,
Expand All @@ -878,8 +924,6 @@ read_op_free (gpointer data)
{
ReadAsyncOp *op = data;

g_object_unref (op->stream);

if (op->error)
g_error_free (op->error);

Expand Down Expand Up @@ -941,7 +985,7 @@ g_input_stream_real_read_async (GInputStream *stream,

op = g_new0 (ReadAsyncOp, 1);

op->stream = g_object_ref (stream);
op->stream = stream;
op->buffer = buffer;
op->count_requested = count;
op->callback = callback;
Expand Down Expand Up @@ -971,8 +1015,6 @@ skip_op_report (gpointer data)
{
SkipAsyncOp *op = data;

op->stream->priv->pending = FALSE;

op->callback (op->stream,
op->count_requested,
op->count_skipped,
Expand All @@ -986,8 +1028,6 @@ skip_op_free (gpointer data)
{
SkipAsyncOp *op = data;

g_object_unref (op->stream);

if (op->error)
g_error_free (op->error);

Expand Down Expand Up @@ -1048,7 +1088,7 @@ g_input_stream_real_skip_async (GInputStream *stream,

op = g_new0 (SkipAsyncOp, 1);

op->stream = g_object_ref (stream);
op->stream = stream;
op->count_requested = count;
op->callback = callback;
op->data = data;
Expand Down Expand Up @@ -1077,8 +1117,6 @@ close_op_report (gpointer data)
{
CloseAsyncOp *op = data;

op->stream->priv->pending = FALSE;

op->callback (op->stream,
op->res,
op->data,
Expand All @@ -1090,8 +1128,6 @@ close_op_free (gpointer data)
{
CloseAsyncOp *op = data;

g_object_unref (op->stream);

if (op->error)
g_error_free (op->error);

Expand Down Expand Up @@ -1151,7 +1187,7 @@ g_input_stream_real_close_async (GInputStream *stream,

op = g_new0 (CloseAsyncOp, 1);

op->stream = g_object_ref (stream);
op->stream = stream;
op->callback = callback;
op->data = data;
op->notify = notify;
Expand Down
75 changes: 57 additions & 18 deletions goutputstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ struct _GOutputStreamPrivate {
guint cancelled : 1;
GMainContext *context;
gint io_job_id;
gpointer outstanding_callback;
};

static void g_output_stream_real_write_async (GOutputStream *stream,
Expand Down Expand Up @@ -401,6 +402,22 @@ queue_write_async_result (GOutputStream *stream,
g_source_unref (source);
}

static void
write_async_callback_wrapper (GOutputStream *stream,
void *buffer,
gsize count_requested,
gssize count_written,
gpointer data,
GError *error)
{
GAsyncWriteCallback real_callback = stream->priv->outstanding_callback;

stream->priv->pending = FALSE;
(*real_callback) (stream, buffer, count_requested, count_written, data, error);
g_object_unref (stream);
}


/**
* g_output_stream_write_async:
* @stream: A #GOutputStream.
Expand Down Expand Up @@ -490,7 +507,9 @@ g_output_stream_write_async (GOutputStream *stream,
class = G_OUTPUT_STREAM_GET_CLASS (stream);

stream->priv->pending = TRUE;
return class->write_async (stream, buffer, count, io_priority, callback, data, notify);
stream->priv->outstanding_callback = callback;
g_object_ref (stream);
class->write_async (stream, buffer, count, io_priority, write_async_callback_wrapper, data, notify);
}

typedef struct {
Expand Down Expand Up @@ -559,6 +578,20 @@ queue_flush_async_result (GOutputStream *stream,
g_source_unref (source);
}

static void
flush_async_callback_wrapper (GOutputStream *stream,
gboolean result,
gpointer data,
GError *error)
{
GAsyncFlushCallback real_callback = stream->priv->outstanding_callback;

stream->priv->pending = FALSE;
(*real_callback) (stream, result, data, error);
g_object_unref (stream);
}


void
g_output_stream_flush_async (GOutputStream *stream,
int io_priority,
Expand Down Expand Up @@ -597,7 +630,9 @@ g_output_stream_flush_async (GOutputStream *stream,
class = G_OUTPUT_STREAM_GET_CLASS (stream);

stream->priv->pending = TRUE;
return class->flush_async (stream, io_priority, callback, data, notify);
stream->priv->outstanding_callback = callback;
g_object_ref (stream);
class->flush_async (stream, io_priority, flush_async_callback_wrapper, data, notify);
}

typedef struct {
Expand Down Expand Up @@ -666,6 +701,20 @@ queue_close_async_result (GOutputStream *stream,
g_source_unref (source);
}

static void
close_async_callback_wrapper (GOutputStream *stream,
gboolean result,
gpointer data,
GError *error)
{
GAsyncCloseOutputCallback real_callback = stream->priv->outstanding_callback;

stream->priv->pending = FALSE;
stream->priv->closed = TRUE;
(*real_callback) (stream, result, data, error);
g_object_unref (stream);
}

/**
* g_output_stream_close_async:
* @stream: A #GOutputStream.
Expand Down Expand Up @@ -716,7 +765,9 @@ g_output_stream_close_async (GOutputStream *stream,

class = G_OUTPUT_STREAM_GET_CLASS (stream);
stream->priv->pending = TRUE;
return class->close_async (stream, io_priority, callback, data, notify);
stream->priv->outstanding_callback = callback;
g_object_ref (stream);
class->close_async (stream, io_priority, close_async_callback_wrapper, data, notify);
}

/**
Expand Down Expand Up @@ -782,8 +833,6 @@ write_op_report (gpointer data)
{
WriteAsyncOp *op = data;

op->stream->priv->pending = FALSE;

op->callback (op->stream,
op->buffer,
op->count_requested,
Expand All @@ -798,8 +847,6 @@ write_op_free (gpointer data)
{
WriteAsyncOp *op = data;

g_object_unref (op->stream);

if (op->error)
g_error_free (op->error);

Expand Down Expand Up @@ -861,7 +908,7 @@ g_output_stream_real_write_async (GOutputStream *stream,

op = g_new0 (WriteAsyncOp, 1);

op->stream = g_object_ref (stream);
op->stream = stream;
op->buffer = buffer;
op->count_requested = count;
op->callback = callback;
Expand Down Expand Up @@ -890,8 +937,6 @@ flush_op_report (gpointer data)
{
FlushAsyncOp *op = data;

op->stream->priv->pending = FALSE;

op->callback (op->stream,
op->result,
op->data,
Expand All @@ -904,8 +949,6 @@ flush_op_free (gpointer data)
{
FlushAsyncOp *op = data;

g_object_unref (op->stream);

if (op->error)
g_error_free (op->error);

Expand Down Expand Up @@ -967,7 +1010,7 @@ g_output_stream_real_flush_async (GOutputStream *stream,

op = g_new0 (FlushAsyncOp, 1);

op->stream = g_object_ref (stream);
op->stream = stream;
op->callback = callback;
op->data = data;
op->notify = notify;
Expand All @@ -994,8 +1037,6 @@ close_op_report (gpointer data)
{
CloseAsyncOp *op = data;

op->stream->priv->pending = FALSE;

op->callback (op->stream,
op->res,
op->data,
Expand All @@ -1007,8 +1048,6 @@ close_op_free (gpointer data)
{
CloseAsyncOp *op = data;

g_object_unref (op->stream);

if (op->error)
g_error_free (op->error);

Expand Down Expand Up @@ -1068,7 +1107,7 @@ g_output_stream_real_close_async (GOutputStream *stream,

op = g_new0 (CloseAsyncOp, 1);

op->stream = g_object_ref (stream);
op->stream = stream;
op->callback = callback;
op->data = data;
op->notify = notify;
Expand Down

0 comments on commit 5ed6bbd

Please sign in to comment.