Postgres-XC, as a sharding cluster (write-scalable, multi-master based on PostgreSQL) has currently a huge limitation related to the way tables are distributed.
Just to recall, tables can be either replicated, distributed by round robin, hash or modulo. For hash and modulo the distribution can be done based on the values of one column. Distribution type is defined thanks to an extension of CREATE TABLE.
CREATE TABLE...
[ DISTRIBUTE BY { REPLICATION | ROUND ROBIN | { [HASH | MODULO ] ( column_name ) } } ]
[ TO ( GROUP groupname | NODE nodename [, ... ] ) ]

However once defined it cannot be changed while a cluster is running. There is still the method consisting in using a CREATE TABLE AS consisting in fetching all the data of the table into an intermediate one, then dropping the old table and remaming the intermediate table as the old one. This is enough for 1.0 but the table Oid is definitely lost.

One of the features I have been working these days is to provide to the applications a simple SQL interface that would allow to change a table distribution on the fly, meaning that all the data is transferred automatically between nodes with a single SQL.
This feature uses an extension to ALTER TABLE as follows:
ALTER TABLE
DISTRIBUTE BY { REPLICATION | ROUND ROBIN | { [HASH | MODULO ] ( column_name ) } }
TO { GROUP groupname | NODE ( nodename [, ... ] ) }
ADD NODE ( nodename [, ... ] )
DELETE NODE ( nodename [, ... ] )

This basically means that you can change the distribution type of a table and the subset of nodes where data is located. The node list where data is distributed can be reset, increased or reduced at will.

The redistribution funcionality is still pretty basic, but what is simply does is:

  1. fetch all the data of the table to be redistributed on Coordinator
  2. Truncate the table
  3. Update the catalogs to the new distribution type
  4. Redistribute the data cached on Coordinator

A tuple store is used to cache the data on Coordinator at phase 1, which can be customized with work_mem. A COPY protocol is used to exchange the data between nodes as fastly as possible. This functionality also includes some new stuff to materialize in a tuple slot the data received with COPY protocol (reverse operation also implemented), essential when a tuple has to be redirected to a given node based on a hash value. And it looks that such a materialization mechanism would be a milestone to a more complex mechanism for global constraints and triggers in XC.
This is still a basic implementation, and the following improvements are planned once the basic stuff is committed:

  • Save materialization if it is not necessary (new distribution set to round robin, replication)
  • Truncate the table on a portion of nodes if a replicated table has its subset of nodes reduced
  • COPY only necessary data for a replicated table to new nodes if its subset of nodes is increased
  • And a couple of other things

So how does it work? Let’s take an example with this simple cluster, 1 Coordinator and 3 Datanodes:
postgres=# select node_name, node_type from pgxc_node;
node_name | node_type
-----------+-----------
coord1 | C
dn1 | D
dn2 | D
dn3 | D
(4 rows)

A table aa is created as replicated with 10,000 rows on all the nodes.
postgres=# CREATE TABLE aa (a int);
CREATE TABLE
postgres=# INSERT INTO aa VALUES (generate_series(1,10000));
INSERT 0 10000
postgres=# EXECUTE DIRECT ON (dn1) 'SELECT count(*) FROM aa';
count
-------
10000
(1 row)
postgres=# EXECUTE DIRECT ON (dn2) 'SELECT count(*) FROM aa';
count
-------
10000
(1 row)
postgres=# EXECUTE DIRECT ON (dn3) 'SELECT count(*) FROM aa';
count
-------
10000
(1 row)

So here there are 10,000 tuples on each nodes, nothing fancy for a replicated table.

Let’s change it to a hash-based distribution…
postgres=# ALTER TABLE aa DISTRIBUTE BY HASH(a);
NOTICE: Copying data for relation "public.aa"
NOTICE: Truncating data for relation "public.aa"
NOTICE: Redistributing data for relation "public.aa"
ALTER TABLE
postgres=# EXECUTE DIRECT ON (dn1) 'SELECT count(*) FROM aa';
count
-------
3235
(1 row)
postgres=# EXECUTE DIRECT ON (dn2) 'SELECT count(*) FROM aa';
count
-------
3375
(1 row)
postgres=# EXECUTE DIRECT ON (dn3) 'SELECT count(*) FROM aa';
count
-------
3390
(1 row)

Now one third of the data is on each node.

What happens if the set of nodes is reduced? Let’s now remove the data on node dn2.
postgres=# ALTER TABLE aa DELETE NODE (dn2);
NOTICE: Copying data for relation "public.aa"
NOTICE: Truncating data for relation "public.aa"
NOTICE: Redistributing data for relation "public.aa"
ALTER TABLE
postgres=# EXECUTE DIRECT ON (dn1) 'SELECT count(*) FROM aa';
count
-------
5039
(1 row)
postgres=# EXECUTE DIRECT ON (dn2) 'SELECT count(*) FROM aa';
count
-------
0
(1 row)
postgres=# EXECUTE DIRECT ON (dn3) 'SELECT count(*) FROM aa';
count
-------
4961
(1 row)

The data is now hashed on nodes dn1 and dn3. There is no more data on dn2.

This implementation is still pretty basic, but opens a couple of possibilities for clustering applications, no?

This week, a great feature has been added by commit 8a05756, completed by commit caf1554 in Postgres-XC GIT repository.
commit 8a05756a702051d55a35ec3f4953f381f977b53a
Author: Pavan Deolasee Date: Wed Dec 14 09:35:53 2011 +0530
 
Implement support for CREATE TABLE AS, SELECT INTO and INSERT INTO
statements. We start by fixing the INSERT INTO support. For every result
relation, we now build a corresponding RemoteQuery node so that the
inserts can be carried out at the remote datanodes. Subsequently, at
the coordinator at execution time, instead of inserting the resulting tuples
in a local heap, we invoke remote execution and insert the rows in the
remote datanodes. This works nicely even for prepared queries, multiple
values clause for insert as well as any other mechanism of generating
tuples.
 
We use this infrastructure to then support CREATE TABLE AS SELECT (CTAS).
The query is transformed into a CREATE TABLE statement followed by
INSERT INTO statement and then run through normal planning/execution.
 
There are many regression cases that need fixing because these statements
now work correctly. This patch fixes many of them. Few might still be
failing, but they seem unrelated to the work itself and might be a
side-effect. We will fix them once this patch gets in.

Simply, this is the support for CREATE TABLE AS and SELECT INTO. All the possible combinations of INSERT SELECT are also possible whatever the type of table used.

Let’s see through a couple of examples with this cluster of 1 Coordinator and 4 Datanodes.
postgres=# select oid,node_name,node_type from pgxc_node;
oid | node_name | node_type
-------+-----------+-----------
11133 | coord1 | C
16384 | dn1 | D
16385 | dn2 | D
16386 | dn3 | D
16387 | dn4 | D
(5 rows

Let’s create a table and populate it with some data.
postgres=# create table a as select generate_series(1,100);
INSERT 0 100
postgres=# select count(*) from a;
count
-------
100
(1 row)

The data is distributed through the cluster of the 4 Datanodes.
postgres=# execute direct on node dn4 'select count(*) from a';
count
-------
27
(1 row)
postgres=# execute direct on node dn3 'select count(*) from a';
count
-------
19
(1 row)
postgres=# execute direct on node dn2 'select count(*) from a';
count
-------
31
(1 row)
postgres=# execute direct on node dn1 'select count(*) from a';
count
-------
23
(1 row)

CREATE TABLE AS is not only limited to global tables, you can define a distribution type, a subset of nodes, and of course the table can be unlogged or temporary. Here the table is distributed by round robin on datanodes dn1 and dn2.
postgres=# create table c distribute by round robin to node dn1,dn2 as select * from b;
INSERT 0 100
postgres=# execute direct on node dn1 'select count(*) from c';
count
-------
50
(1 row)
postgres=# execute direct on node dn2 'select count(*) from c';
count
-------
50
(1 row)
postgres=# execute direct on node dn3 'select count(*) from c';
count
-------
0
(1 row)
postgres=# execute direct on node dn4 'select count(*) from c';
count
-------
0
(1 row)

However, SELECT INTO does not have any extension for distribution type and node subsets. The reason for that is because SELECT INTO is by default a SELECT query, CREATE TABLE AS is a DDL. So in this case table created is distributed by hash on all the nodes.
postgres=# select * into d from b;
INSERT 0 100
postgres=# select pclocatortype,nodeoids from pgxc_class where pcrelid = 'd'::regclass;
-[ RECORD 1 ]-+------------------------
pclocatortype | H
nodeoids | 16384 16385 16386 16387

Yeah, that rocks.

Sometimes you have to face some formats that are not installed by default in Ubuntu environments.
And it may be a problem if you cannot extract such archives.

Fortunately, there are some free applications provided with your distribution.
If you are not using Ubuntu, you can find debian or rpm packages easily.
In order to do that, there is this useful RPM package searcher
or this Debian package searcher.

For Ubuntu, which is well… Debian based… (But it uses an APT system to manage its distribution packages)
Here is how to install those packages with commands (geek-mode).
To install rar format manager:
sudo apt-get install unrar-free
To install 7z format manager:
sudo apt-get install p7zip

Or for beginners you can find the packages in the software package center.
For that you have just to make a research with p7zip and unrar-free for each application in the Ubuntu Software Center in the Application tab of your menu bar.

To decompress a file with p7zip, you have to do the following command:
7z x $FILE_NAME
$FILE_NAME being the name of your 7z file.

To decompress a file with unrar-free, you have to do the following command:
unrar-free x $FILE_NAME
$FILE_NAME being the name of your rar file.

©2010-2013 Michael Paquier All content is ©Copyright of Otacoo.com 2010-2013. Privacy Policy - Terms of Use