When using a database cluster where nodes are completely dependent on the network behavior like it is the case for shared-nothing based architecture, an essential step in integrating a new application is its design to maximize the performance and enhance the cluster strengths knowing its capabilities. The problem with shared-nothing architectures is that queries that may run very quickly in a single instance environment might take a bunch of time in your cluster, especially for queries having aggregates that need global results for all the nodes in the cluster or queries needing subsequent results from internal sub-queries. The nightmare of most of database developers trying to optimize an application is always to face queries of the type “SELECT *”, fetching all the tuples of a table in a single shot. Such queries are already costly for single database instances, but just don’t imagine how much you might load your application when fetching all the tuples of your table with additional node layers forcing your database to fetch results from multiple nodes.
I would say by experience that there are three guidelines when customizing an application for a cluster:

  • Choose the number of nodes in cluster and servers where they are located to minimize overall network load.
  • Customize queries such as they request results from a minimal number of nodes.
  • Design the data distribution strategy of your tables to minimize the results to be fetched to a single place for join conditions.

After this short digression, the aim of this post is not to discuss deeply about such conditions, it is to enlighten a feature that your application needs to target when you design its database. Well, the name of this feature is in the title, called “Fast Query Shipping”. It is not really a feature in itself, more a goal applications should try to reach as much as possible to improve their performance on database cluster softwares. Fast query shipping (FQS) is the ability for a cluster to evaluate if a query can be completely shipped to a remote node in the cluster, making it a simple send and receive, minimizing the plan cost on the node planning the query and the data transfer cost because the data fetched from remote node with such a query is minimized to exactly what the application targets.

To be honest, googling “Fast Query shipping” does not bring any result except on Postgres-XC. The basic implementation of this feature has been done with the following commit.
commit 191d55ebf1faf897aed51f1b5fdcd71ec3ccdc6c
Author: Ashutosh Bapat
Date: Thu Feb 2 16:59:04 2012 +0530
 
Add the support for Fast Query Shipping (FQS), a method to identify
whether a query can be sent to the datanode/s as it is for evaluation and do so
if deemed fit. In such cases, we create a plan with a single RemoteQuery node
corresponding to the query and avoid the planning phase on coordinator.
 
A query tree walker analyses all the nodes in the query tree and finds out the
conditions under which the query is shippable and detects presence of
expressions which can not be evaluated on the datanode. It looks at the
relations involved in the query and deducts whether JOINs between these
relations can be evaluated on a single datanode.
 
Adds testcases xc_FQS and xc_FQS_join to test the fast query shipping
functionality and make it independent of cluster configuration.

So, in this case, an extension of PostgreSQL planner has been done exclusively for Postgres-XC to evaluate if a query is entirely shippable to its dedicated remote node. This planning step determines the list of target nodes where to launch the query. The query can be basically shipped as depending on a lot of conditions like analysis of clauses, but basically you cannot ship a query if it contains expressions that cannot be evaluated on a remote node. A simple expression following that is the next value of a sequence, or timestamps. In a more general way it is a volatile or stable functions. There are also other expressions that cannot be shipped like window functions, GROUP BY clauses, aggregates, etc. Sometimes you may be able to ship entirely a query having an aggregate function, but targeting a single query. Well, there are a lot of cases possible, and you might look at the code in details if you are interested about each corner case.

Let’s have a look at some simple test cases with replicated and hash tables:
postgres=# create table rep (a int) distribute by replication;
CREATE TABLE
postgres=# create table hash (a int) distribute by hash(a);
CREATE TABLE

For a table replicated on all nodes, shipping results from a single node is sufficient for the simple “select *”. For a distribute table, all the nodes are targetted and results are sent back as such. This is in such configuration that you will get the most efficient queries running in a cluster environment.
postgres=# explain select * from rep;
QUERY PLAN
----------------------------------------------------------------------------
Data Node Scan on "__REMOTE_FQS_QUERY__" (cost=0.00..0.00 rows=0 width=0)
Node/s: dn1
(2 rows)
postgres=# explain select * from hash;
QUERY PLAN
----------------------------------------------------------------------------
Data Node Scan on "__REMOTE_FQS_QUERY__" (cost=0.00..0.00 rows=0 width=0)
Node/s: dn1, dn2
(2 rows)

A session parameter called enable_fast_query_shipping is available to set switch this feature to on/off. Let’s see what happens.
postgres=# SET enable_fast_query_shipping TO false;
SET
postgres=# explain select * from hash;
QUERY PLAN
-------------------------------------------------------------------
Result (cost=0.00..1.01 rows=1000 width=4)
-> Data Node Scan on hash (cost=0.00..1.01 rows=1000 width=4)
Node/s: dn1, dn2
(3 rows)
postgres=# explain select * from rep;
QUERY PLAN
------------------------------------------------------------------
Result (cost=0.00..1.01 rows=1000 width=4)
-> Data Node Scan on rep (cost=0.00..1.01 rows=1000 width=4)
Node/s: dn1
(3 rows)

Here what happens is that the query is not using any FQS features, so what happens is that you do not directly fetch the results from the node, but you also materialize them on the Coordinator where query is launched before sending them back to client.

This was a small introduction of the fast query shipping feature, just do not forget to test it.

This week, I spent a long time working on this commit. Just by looking at the date, commit happened before leaving for week-end :)
commit 8ef0c48acadec3c9888d302888a7d279d82323e5
Author: Michael P
Date: Fri Jan 13 16:05:00 2012 +0900
 
Improve target list selection for remote DML queries
 
This commit makes remote DML planning generally available
for replicated and hash tables. There are still issues
related to node selection for round robin tables though.
The target list of UPDATE and DELETE using coordinator quals
was set to fetch only CTID when generating SELECT in their
inner plan generated by create_remotequery_plan.
 
Their target list is rewritten to include the columns in quals
so as to be able to evaluate those quals correctly on Coordinator.
In addition remote planning for UPDATE has been improved to be
able to target correct node when launching query.
 
A new regression test called xc_remote is added, it uses the
parameter enable_fast_query_shipping to force all the queries
to go through standard planner. Tests are done on replicated,
hash and round robin tables.

In all the examples of this article, those two tables are used with the following cluster configuration of Postgres-XC cluster.
db=# select node_name, node_type from pgxc_node; -- 1 Coordinator, 2 Datanodes
node_name | node_type
-----------+-----------
coord1 | C
dn1 | D
dn2 | D
(3 rows)
db=# create table aa (a int, b timestamp) distribute by hash(a);
CREATE TABLE
db=# create table bb (a int, b timestamp) distribute by replication;
CREATE TABLE

For database clusters in general, it is essential to have an efficient and consistent way to manage queries on both local and remote nodes. Efficiency is important to reduce data load on the system. Consistency is even more important to avoid dirty data in your database. So, about queries in general, let’s use an example. SELECT queries may contain expressions that can be evaluated on remote nodes. A common example for that is when the expression is a constant.
db=# explain verbose select * from aa where a = 1;
QUERY PLAN
---------------------------------------------------
Data Node Scan (cost=0.00..0.00 rows=0 width=0)
  Output: a, b
  Node/s: dn1
  Remote query: SELECT a, b FROM aa WHERE (a = 1)
(4 rows)

In this case the query can be completely shipped to the remote node, returning correct results.

Expressions that cannot be pushed down are those who need to be evaluated on local nodes with all the necessary data fetched from remote nodes. For example, let’s take the replicated table bb. We want to select data on it with a time-based expression. Each node of the cluster (at least in the case of Postgres-XC) is located on a different server, each server having a different time line.So, is the following SQL shippable?
SELECT a from bb where b < now();
The answer is no. What is necessary to do is to get all the tuples (a,b) from table bb (a is necessary to send back result), and then apply the time based condition on all the results (explaining why b is necessary).
This results in the following plan.
db=# explain verbose select a from bb where b < now();
QUERY PLAN
-----------------------------------------------------------------
 Result (cost=0.00..1.01 rows=1000 width=4)
 Output: a
 -> Data Node Scan on bb (cost=0.00..1.01 rows=1000 width=4)
   Output: a, b
   Node/s: dn1
   Remote query: SELECT a, b FROM ONLY bb WHERE true
   Coordinator quals: (bb.b < now())
(7 rows)

Well, Postgres-XC has already a lot of mechanisms to manage SELECT and INSERT queries. But what was missing are the parts related to UPDATE and DELETE. So the new functionality committed this week allows to use complex expressions.
For example, in the case of update, you can run sequence and time based updates needing local node evaluation to run consistently.
db=# insert into bb values (1,now());
INSERT 0 1
db=# insert into bb values (2,now());
INSERT 0 1
db=# insert into bb values (3,now());
INSERT 0 1
db=# select * from bb;
a | b
---+---------------------------------
1 | Fri Jan 13 06:26:32.872665 2012
2 | Fri Jan 13 06:26:38.261489 2012
3 | Fri Jan 13 06:26:40.943182 2012
(3 rows)
db=# update bb set a = nextval('seq'), b = now();
UPDATE 3
db=# select * from bb;
a | b
---+---------------------------------
1 | Fri Jan 13 06:28:01.273496 2012
2 | Fri Jan 13 06:28:01.273496 2012
3 | Fri Jan 13 06:28:01.273496 2012
(3 rows)
db=# explain verbose update bb set a = nextval('seq'), b = now();
QUERY PLAN
-----------------------------------------------------------------------
 Update on public.bb (cost=0.00..11.01 rows=1000 width=6)
 Node/s: dn1, dn2
 Remote query: UPDATE public.bb SET a = $1, b = $2 WHERE ctid = $3
 -> Result (cost=0.00..11.01 rows=1000 width=6)
   Output: nextval('seq'::regclass), now(), ctid
   -> Data Node Scan on bb (cost=0.00..1.01 rows=1000 width=6)
      Output: ctid
      Node/s: dn1
      Remote query: SELECT ctid FROM ONLY bb WHERE true
(9 rows)

You need here to select all the data to be updated from remote nodes, then you have to apply the time base expression (now) and the sequence value (nextval), and finally push those values to dedicated remote nodes.

This works also with WHERE clauses using non-shippable expressions.
db=# explain verbose update bb set a = nextval('seq'), b = now() WHERE b < now();
QUERY PLAN
---------------------------------------------------------------------------------
 Update on public.bb (cost=0.00..11.02 rows=1000 width=14)
  Node/s: dn1, dn2
  Remote query: UPDATE public.bb SET a = $1, b = $2 WHERE b = $3 AND ctid = $4
  -> Result (cost=0.00..11.02 rows=1000 width=14)
    Output: nextval('seq'::regclass), now(), b, ctid
    -> Data Node Scan on bb (cost=0.00..1.01 rows=1000 width=14)
      Output: b, ctid
      Node/s: dn1
      Remote query: SELECT b, ctid FROM ONLY bb WHERE true
      Coordinator quals: (bb.b < now())
(10 rows)

Here what is added is a condition to pre-select a subset of rows. Such operation is costly though because you have to fetch all the rows of the table first in inner plan.

The same kind of crazy SQL are also possible for DELETE with mixing shippable and non-shippable expressions.
db=# explain verbose delete from bb where a = 2 and b < now();
QUERY PLAN
------------------------------------------------------------------------------
 Delete on public.bb (cost=0.00..1.02 rows=1000 width=18)
 Node/s: dn1, dn2
 Remote query: DELETE FROM public.bb WHERE a = $1 AND b = $2 AND ctid = $3
 -> Result (cost=0.00..1.02 rows=1000 width=18)
   Output: a, b, ctid
   -> Data Node Scan on bb (cost=0.00..1.02 rows=1000 width=18)
     Output: a, b, ctid
     Node/s: dn1
     Remote query: SELECT a, b, ctid FROM ONLY bb WHERE (a = 2)
     Coordinator quals: (bb.b < now())
(10 rows)

You can notice here that the constant expression "a = 2" is shipped in the most inner plan, improving query efficiency by that much.
A lot of things are now possible, and all this stuff will be included in release 0.9.7!

©2010-2013 Michael Paquier All content is ©Copyright of Otacoo.com 2010-2013. Privacy Policy - Terms of Use