Business Intelligence Network business intelligence resources

Blog: Dan E. Linstedt

« ETL/ELT: VLDW & Data Integration Details | Main | Normalizing / Denormalizing: VLDW & Architecture »

ETL/ELT: VLDW and Multi-passing data

In my last entry I got caught up explaining how to reduce the data set and increase parallelism. That should provide a huge bang for the buck for most installations. Again, we must go against the grain of what we learn when we deal with very large volumes of data. What we find is that large data takes a specific architecture to solve these problems. In this entry we'll discuss the ability to multi-pass data sets rather than single pass data sets, and why this makes a difference in performance.

The rumor, the un-truth, the fallacy:
"Run all your data through a single data flow, in a single pass - it's the fastest way to deal with it if you only touch it once."

Well, this isn't always true - this often times leads to:
a) Poor design
b) Sacrificed performance
c) Complex data flow diagrams

These elements cause:
1) Complex data flows
2) Multi-targets
3) (Often) multi-sources that are inter-dependent
4) Extreme amount of caching within a single process

Now, we all know that the shortest distance between two points is a straight line (in standard time/space continuum, with today’s mathematics). Let's assume for a minute that our data flow diagram is a straight line (well, at least that's how it starts). If we introduce too many transformations, too much complexity, too much caching, and too much processing we interrupt the flow of the straight line, and it now becomes a very crooked and jagged line. For that reason, maintenance costs increase, performance drops, and inevitably we reach a point where "more processing" is not feasible without re-architecture / simplifying the process.

Well, the real truth (on today's hardware) is as follows:
The "one-passing" of the data is the fastest way ONLY WHEN:
a) all the data that must be cached can be
b) all the data (including in the caches) can be executed in parallel and in block style
c) The entire flow can be split from end-to-end in a multi-parallel fashion
d) _all_ the source data and _all_ the target data can be cached in RAM before interacting with the process itself or the target database

This usually requires a monster 64 bit box with 80+ GB of RAM, and 64+ CPU's at high speeds, but even then - the data set is likely to outgrow the hardware, such that one or more of the "caches" hits disk. Once this happens, all bets of this being the fastest architecture are off.

Now, back to the analogy: what is the "simplest and fastest" data flow diagram we can ever build?
You guessed it: a straight line - "copy map". Pick up data at point "A" provide a double buffering mechanism within the ETL engine, and drop it / insert it in to point "B". This is the fastest mechanism (all data sets and structures being equal).

Here's another question: in real-life, what do we do to solve BIG PROBLEMS?
We break them in to bite-sized chunks, manageable chunks, pieces that we can solve one at a time... Well, common sense says we should do this for VLDW data integration loads as well.

How do we get to multi-passing?
Suppose we have a mainframe table "MF-CUST-SOMETHING-ELSE" with 360 fields on it, the row size in the mainframe copybook equates to around 4k in size. This source table contains 5.5 Billion records, and grows on a yearly basis by 1.5 billion rows (average over 365 days is about: 41 million records a day). (I know, the table I used was not really customer, but I can't discuss the real table names that I've worked with).

Now also suppose that this table could not, nor would it ever have any sort of "trigger" to tell us what rows are new, and which (out of the existing 5.5 billion) have been changed on a daily basis. Also suppose that their were no hope of putting any sort of CDC engine on the mainframe... is this beginning to sound like your situation (except maybe for the volume of course?) The source table also, did NOT have any sort of date, but it did have a sequencer column.

Ok - this is a real-live case that we had to deal with. The questions are:
a) how do we identify the NEW rows to pull only the new rows from the source and compare them against the target. In other words, which 41 million rows are actually new? that we didn't get on yesterday's feed?
b) how do we identify the "changed" rows, and separate those from the existing 5.5 billion rows? We can't possibly pull all 5.5 billion rows across to the target for comparison purposes, nor can we try to "copy" the target back to the source for comparison purposes.

Using the sequencer we were able to determine the "last" sequence we pulled from the last successful feed, and only pull brand new rows from that point. But the updates were a night-mare. Why? we didn't have a "date" to work with, but even with a date on the rows, it still can be an issue.

Let's make it a little easier and put an "updated_date" on the source table, suppose this "updated_date" is really "applied_date" - when the data applied in time, so when they updated the row, they may dictate that the data really "applies" to 2 years, or 5 years ago - now we have a problem. Even if you don't have these volumes you might have these issues.

Now suppose that updates made up about 20 million rows of historical data, how do we best:
a) identify the updates
b) pull only the updates over the network
c) run the updates through the complex logic
d) separate the "inserts" from the "updates"?

Ok, first:
1) we separate inserts from updates
2) we setup the insert "flow" to be partitioned and parallel (although given the performance rates I usually get - around 20,000 rows per second to 40,000 rows per second for a 4k row) we can still handle the inserts (all 41 million rows) in about 17 minutes. With parallelism we can double, triple, and quadruple these throughput rates.
3) We need to change the way updates are processed, we can't simply "copy the whole table" to the target, we can't even "copy just the keys" to the target to see what's changed. It simply is too big, with the target growing too large as well.

We split the updates in to a multiple pass algorithm.
1) we went to a single scan of all source rows containing two columns: the primary key (source sequence number), and the update date, scanned in PK/sequence order
2) we joined (sorted heterogeneous join, no caching) to the target table, and pulled: the "recorded" update date, and the "recorded source" sequence number (sorted the target) in the target database...
3) we bounced the two columns against each other to see "which dates had really changed"
4) for the rows with detected "changes" to the dates, we wrote ONE column BACK to the source system to a new table (which was truncated and re-loaded every cycle), the one column was the source sequence primary key, upon which we built an index post-load.

Now for the math:
Source Width: PK Sequence = number(12), could be between 8 and 12 bytes, update date: actually 6 bytes (stored in character format: DDMMYY in a PIC statement)
Target width: PK sequence = number(12), could be between 8 and 12 bytes, update date: actually 4 bytes (date format in the target)

Ok, assuming 8 bytes for the numbers, + 6 bytes for the unconverted date, our row was: 14 bytes wide for this particular part of the process, this process had block sizes of 256k within the transformation engine (32 bit system because that's all we had at the time - this was 10 years ago).

TCP/IP Packet: 1k, fit about 73 rows per packet
ETL engine: 256k, fit about 18,285 rows per block

We scanned the source table: 5.5 billion rows on the mainframe - funnelling the data back as fast as the Mainframe could read rows, but only pulling those two columns (not the 4k for each row, because at this STEP we didn't need all the data).
At the SAME TIME, we sourced the EDW data set (it arrived in sorted order because of the clustered and partitioned index table we had on the EDW table).
Already a parallelism factor of 2 + what ever parallelism the source had + what ever parallelism the EDW table had.

The sorted join was an INNER JOIN MATCH on sequence number, arriving in sorted order followed by a filter which eliminated rows from the stream that had matching update dates.

Our "target" table was back on the mainframe, again consisted only of the sequence key. So we were writing only 8 bytes ber row which means our writer block size at 256k could now fit: 32,000 rows before being dumped to the main-frame.

This was just the FIRST PASS, in the second pass we "joined" the new sequence table to the original 5.5 billion row table to pull JUST THE UPDATES (we dramatically reduced the data set, and found ALL the updates) the total number of updates on average per day was around 14 million rows, we then pulled only the columns we needed based on the PK sequence match and loaded them to a staging area in the target database. This was the second pass.

The third pass compared columns in the staging area to the actual target table to determine the "REAL" updated rows.

Each "pass" of the data was setup to increase parallelism and partitioning, I think our total job stream ran to completion in about 30 to 45 minutes on a daily basis... remember, this was 10 years ago on a 32 bit ETL machine (64 bit target database/64 bit target DB OS) and a mainframe. We had to do some tuning to the mainframe "CPU utilization rates", and some tuning to the target DB to get this to work, but it all worked.

Why did 3 passes of the data set (some would say 4) work faster than if we had "scanned" the whole table ONCE?
1) Because each pass dealt with smaller and smaller chunks of data
2) Because each pass took advantage of parallelism
3) Because each pass could focus on JUST the data it needed to operate on

So by: a) reducing the data set and b) increasing the parallelism we significantly cut the run-time of this process. By the way, the original run-time of the single pass process was well over 14 hours - to try to handle updates, inserts, and "all the data" in a single pass. So the client was very happy to reduce 14 hours to 45 minutes... a huge time savings.

Feel free to contact me, I'd love to hear your stories about situations like this.

Thanks,
Dan Linstedt

  Posted by Dan Linstedt on September 25, 2007 5:36 AM |

Post a comment