Tuesday, May 26, 2015

[Python Script] Get all tables for an schema in Redshift

Every now and then I need to write an script to fetch data from Redshift. This is just a boilerplate code, which takes care of connection creation and running a query on Redshift. I am using PG8000 driver, as its created in pure python.

Code given below:


Friday, May 22, 2015

Data Engineering Design Pattern - Divide and conquer to speed up Data transformation process

Divide and conquer to speed up Data transformation process
----------------------------------------------------------------------
Every once in a while we face this situation where an ETL process finishes by X hour, while we have to deliver reports by X-2 hour or so. And we have to go back and tune our ETL pipeline to gain back those hours.

And more often than not we already have our bases covered( good table design, check, latest stats, good query plan). What if everything is fine and we still have to bring down the ETL time.

People will always say that you have to use a haddop/map reduce problem for such scenario but what if we you have to manage with only the resources that are available to you. In such situation the most commong thing to do is to divide the problem into smaller pieces. And the way I do it is by logical partitioning the data transformation layer.

Here is an example which will help you understand better:


Lets say, we have a basic web sites visits table with billions of records in it and we are running aggregation on this to generate a daily metric. Below is our simplistic transform query, which takes 2 hours to run but we have to finish the job in half an hour:

Original Query:
Select user_agent, count(1) as total_vists, sum(case when behavior='click' then 1 else 0 end) than total_clicks from website_visitors where visit_day=to_date('20150501');

To divide the problem into smaller pieces we can use any inbuilt hashing function provided by the database. For example in case of Oracle we can use Ora_hash, and in case of Redshift we can sue MD5 or Checksum function, although checksum gives a better performance.

Well, a modified query at Oracle will look like this:

Select user_agent, count(1) as total_vists, sum(case when behavior='click' then 1 else 0 end) than total_clicks from website_visitors where visit_day=to_date('20150501') and ora_hash(user_agent,7)=0;

What this query is essentially doing is that it is looking only 1/8 of the data and if everything is fine it will run in about 1/8th of the original time (although in practical cases its takes more than that, still will be much better than 2 hour).

Now for this to work much more effectively, we have to pick up a column which has a good distribution and also we need to ensure that the logical partitions that we are creating doesnt generate a duplicate metric.

And that's all it is :) In the next article, I will tell about how to speed up the loads using divide and conquer.

Tuesday, April 28, 2015

Oracle vs Amazon Redshift – The basic commands

Here are the most often used command on Oracle and their equivalent in Redshift
1. Show Schema
Oracle Way:
1
2
3
4
SELECT
username
FROM
all_users;
Redshift Way:
1
2
SELECT *
FROM pg_namespace;
2. Describe a table and see the field names, types, encoding etc.
Oracle Way:
1
2
Describe SchemName.TableName
Redshift Way:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SELECT DISTINCT n.nspname AS schemaname
,c.relname AS tablename
,a.attname AS COLUMN
,a.attnum AS column_position
,pg_catalog.format_type(a.atttypid, a.atttypmod) AS TYPE
,pg_catalog.format_encoding(a.attencodingtype) AS encoding
,a.attisdistkey AS distkey
,a.attsortkeyord AS sortkey
,a.attnotnull AS notnull
,a.attencodingtype AS compression
,con.conkey AS primary_key_column_ids
,con.contype AS con_type
FROM pg_catalog.pg_namespace n
,pg_catalog.pg_class c
,pg_catalog.pg_attribute a
,pg_constraint con
,pg_catalog.pg_stats stats
WHERE n.oid = c.relnamespace
AND c.oid = a.attrelid
AND a.attnum > 0
AND c.relname NOT LIKE '%pkey'
AND lower(c.relname) = ''
AND n.nspname = ''
AND c.oid = con.conrelid(+)
ORDER BY A.ATTNUM
;
3. Find Disk Usage Per Table 

Oracle Way:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
SELECT
owner,
table_name,
TRUNC(SUM(bytes)/1024/1024) Meg
FROM
(
SELECT
segment_name table_name,
owner,
bytes
FROM
dba_segments
WHERE
segment_type = 'TABLE'
UNION ALL
SELECT
i.table_name,
i.owner,
s.bytes
FROM
dba_indexes i,
dba_segments s
WHERE
s.segment_name = i.index_name
AND s.owner        = i.owner
AND s.segment_type = 'INDEX'
UNION ALL
SELECT
l.table_name,
l.owner,
s.bytes
FROM
dba_lobs l,
dba_segments s
WHERE
s.segment_name = l.segment_name
AND s.owner        = l.owner
AND s.segment_type = 'LOBSEGMENT'
UNION ALL
SELECT
l.table_name,
l.owner,
s.bytes
FROM
dba_lobs l,
dba_segments s
WHERE
s.segment_name = l.index_name
AND s.owner        = l.owner
AND s.segment_type = 'LOBINDEX'
)
WHERE
owner IN UPPER('BIC_DDL') -- PUT YOUR SCHEMANAME HERE
GROUP BY
table_name,
owner
ORDER BY
SUM(bytes) DESC
Redshift Way :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SELECT DISTINCT n.nspname AS schemaname
 ,c.relname AS tablename
 ,a.attname AS COLUMN
 ,a.attnum AS column_position
 ,pg_catalog.format_type(a.atttypid, a.atttypmod) AS TYPE
 ,pg_catalog.format_encoding(a.attencodingtype) AS encoding
  ,a.attisdistkey AS distkey
 ,a.attsortkeyord AS sortkey
 ,a.attnotnull AS notnull
 ,a.attencodingtype AS compression
 ,con.conkey AS primary_key_column_ids
 ,con.contype AS con_type
FROM pg_catalog.pg_namespace n
 ,pg_catalog.pg_class c
 ,pg_catalog.pg_attribute a
 ,pg_constraint con
 ,pg_catalog.pg_stats stats
WHERE n.oid = c.relnamespace
 AND c.oid = a.attrelid
 AND a.attnum > 0
 AND c.relname NOT LIKE '%pkey'
 AND lower(c.relname) = ''
 AND n.nspname = ''
 AND c.oid = con.conrelid(+)
ORDER BY A.ATTNUM
;