Replies: 5 comments 11 replies
-
PostgreSQL streaming is designed to minimize memory usage (keep 1 row in memory at a time). It is sort of equivalent from a network protocol perspective for using a cursor with a fetch size of 1 (though you only need to use one statement total and not one statement per cursor fetch). So it doesn't surprise me that it is slower from a throughput perspective, though 10x slower is higher than I would have expected. To clarify whether or not this is a Sequel issue, is it possible to do the similar testing using ruby-pg directly, and see if you get a similar result? |
Beta Was this translation helpful? Give feedback.
-
I think this is a performance regression in ruby-pg. With current sequel_pg and ruby-pg 1.2.3, it's only about 2x slower. With current sequel_pg and ruby-pg 1.3.0 and higher, it is orders of magnitude slower, about 300 records/second for me. Can you see if you can replicate the results with pg 1.2.3 and pg 1.3.0 in your environment (using your ruby-pg test, without Sequel)? If you can replicate them, we should probably file a bug with ruby-pg. I checked that FWIW, with some small modifications to ruby-pg, I can get sequel_pg to use the ruby-pg streaming support. That results in about 4.72 seconds, still longer than the 4.01 seconds I was getting previously with older versions of ruby-pg, but not longer crazy long. It still looks like we should disable the use of streaming in |
Beta Was this translation helpful? Give feedback.
-
Here's the diff to ruby-pg and sequel_pg if you want to play around with it: ruby-pg: diff --git a/ext/pg_result.c b/ext/pg_result.c
index 8306be1..c996be5 100644
--- a/ext/pg_result.c
+++ b/ext/pg_result.c
@@ -1383,7 +1383,7 @@ pgresult_type_map_get(VALUE self)
static void
-yield_hash(VALUE self, int ntuples, int nfields)
+yield_hash(VALUE self, int ntuples, int nfields, void *data)
{
int tuple_num;
t_pg_result *this = pgresult_get_this(self);
@@ -1397,7 +1397,7 @@ yield_hash(VALUE self, int ntuples, int nfields)
}
static void
-yield_array(VALUE self, int ntuples, int nfields)
+yield_array(VALUE self, int ntuples, int nfields, void *data)
{
int row;
t_pg_result *this = pgresult_get_this(self);
@@ -1417,7 +1417,7 @@ yield_array(VALUE self, int ntuples, int nfields)
}
static void
-yield_tuple(VALUE self, int ntuples, int nfields)
+yield_tuple(VALUE self, int ntuples, int nfields, void *data)
{
int tuple_num;
t_pg_result *this = pgresult_get_this(self);
@@ -1436,8 +1436,8 @@ yield_tuple(VALUE self, int ntuples, int nfields)
}
}
-static VALUE
-pgresult_stream_any(VALUE self, void (*yielder)(VALUE, int, int))
+VALUE
+pgresult_stream_any(VALUE self, void (*yielder)(VALUE, int, int, void*), void* data)
{
t_pg_result *this;
int nfields;
@@ -1465,7 +1465,7 @@ pgresult_stream_any(VALUE self, void (*yielder)(VALUE, int, int))
pg_result_check( self );
}
- yielder( self, ntuples, nfields );
+ yielder( self, ntuples, nfields, data );
pgresult = gvl_PQgetResult(pgconn);
if( pgresult == NULL )
@@ -1516,7 +1516,7 @@ pgresult_stream_any(VALUE self, void (*yielder)(VALUE, int, int))
static VALUE
pgresult_stream_each(VALUE self)
{
- return pgresult_stream_any(self, yield_hash);
+ return pgresult_stream_any(self, yield_hash, NULL);
}
/*
@@ -1532,7 +1532,7 @@ pgresult_stream_each(VALUE self)
static VALUE
pgresult_stream_each_row(VALUE self)
{
- return pgresult_stream_any(self, yield_array);
+ return pgresult_stream_any(self, yield_array, NULL);
}
/*
@@ -1549,7 +1549,7 @@ pgresult_stream_each_tuple(VALUE self)
/* allocate VALUEs that are shared between all streamed tuples */
ensure_init_for_tuple(self);
- return pgresult_stream_any(self, yield_tuple);
+ return pgresult_stream_any(self, yield_tuple, NULL);
}
/* sequel_pg: index d6c1503..41dfdd3 100644
--- a/ext/sequel_pg/sequel_pg.c
+++ b/ext/sequel_pg/sequel_pg.c
@@ -70,6 +70,7 @@
PGconn* pg_get_pgconn(VALUE);
PGresult* pgresult_get(VALUE);
int pg_get_result_enc_idx(VALUE);
+VALUE pgresult_stream_any(VALUE self, void (*yielder)(VALUE, int, int, void*), void* data);
static int spg_use_ipaddr_alloc;
static int spg_use_pg_get_result_enc_idx;
@@ -1659,6 +1660,39 @@ static VALUE spg_set_single_row_mode(VALUE self) {
return Qnil;
}
+struct spg__yield_each_row_stream_data {
+ VALUE self;
+ VALUE *colsyms;
+ VALUE *colconvert;
+ VALUE pg_value;
+ PGresult *res;
+ int enc_index;
+ char type;
+};
+
+static void spg__yield_each_row_stream(VALUE rres, int ntuples, int nfields, void *rdata) {
+ struct spg__yield_each_row_stream_data* data = (struct spg__yield_each_row_stream_data *)rdata;
+ VALUE h = rb_hash_new();
+ VALUE self = data->self;
+ VALUE *colsyms = data->colsyms;
+ VALUE *colconvert= data->colconvert;
+ PGresult *res = data->res;
+ int enc_index = data->enc_index;
+ long j;
+
+ for(j=0; j<nfields; j++) {
+ rb_hash_aset(h, colsyms[j], spg__col_value(self, res, 0, j, colconvert , enc_index));
+ }
+
+ if(data->type == SPG_YIELD_MODEL) {
+ VALUE model = rb_obj_alloc(data->pg_value);
+ rb_ivar_set(model, spg_id_values, h);
+ rb_yield(model);
+ } else {
+ rb_yield(h);
+ }
+}
+
static VALUE spg__yield_each_row_internal(VALUE self, VALUE rconn, VALUE rres, PGresult *res, int enc_index, VALUE *colsyms, VALUE *colconvert) {
long nfields;
long j;
@@ -1667,6 +1701,7 @@ static VALUE spg__yield_each_row_internal(VALUE self, VALUE rconn, VALUE rres, P
VALUE pg_type;
VALUE pg_value = Qnil;
char type = SPG_YIELD_NORMAL;
+ struct spg__yield_each_row_stream_data data;
nfields = PQnfields(res);
@@ -1684,6 +1719,17 @@ static VALUE spg__yield_each_row_internal(VALUE self, VALUE rconn, VALUE rres, P
spg_set_column_info(self, res, colsyms, colconvert, enc_index);
+ data.self = self;
+ data.colsyms = colsyms;
+ data.colconvert = colconvert;
+ data.pg_value = pg_value;
+ data.res = res;
+ data.enc_index = enc_index;
+ data.type = type;
+
+ pgresult_stream_any(rres, spg__yield_each_row_stream, &data);
+ return self;
+
while (PQntuples(res) != 0) {
h = rb_hash_new();
for(j=0; j<nfields; j++) {
|
Beta Was this translation helpful? Give feedback.
-
I updated sequel_pg to not use streaming by default for paged_each: jeremyevans/sequel_pg@d48a7ee |
Beta Was this translation helpful? Give feedback.
-
Thank you for helping ruby-pg patch the performance regression, and for finding a way how to utilize |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I was curious about measuring the memory usage & speed difference between classic cursor-based pagination and Postgres streaming. So, I created the following script:
With cursors, this takes around 2 seconds. However, when I enable the pg_streaming extension, which changes
#paged_each
to use streaming, the ETA for completion was > 25 minutes.Am I missing something? Maybe I'm misunderstanding Postgres streaming, thinking that it should be comparably fast to the cursor implementation? I'm using the latest Sequel (5.54.0), sequel_pg (1.14.0), and Postgres (14.2) versions (installed via Homebrew).
Beta Was this translation helpful? Give feedback.
All reactions