Monday, November 28, 2011
Friday, November 25, 2011
Synchronizing the File Staging Area with Talend Open Studio
Every filesystem can be seen as a non-relational DBMS, in which data are sequentially stored into disk data "blocks", logically organized into a hierarchy (in presence of hard links, as graph) referenced by API pointers. Using the filesystem is the most "stupid", but also the most efficient way to handle and store data with poor interdependency and low mutual relationships: i.e., non-relational data.
Most of the existing Data Warehousing solutions present a Data Staging Area, often implemented as a collection of flat files, stored at filesystem level. Data are first "brutally" downloaded by source systems and then stored into flat files, and only later processed by the ETL jobs.
The Data Staging area can be seen, therefore, as an intermediate storage area between the sources of information and the Data Warehouse itself, providing the following benefits:
In our scenario, taken by the environmental data monitoring project of my university, we have a collection of different source environmental data measuring stations, each of one generates a series of data files in different formats, containing all the environmental measures, for each sensor and with the corresponding timestamp.
One of the source systems is the meterological FTP site of the Hortus Ltd. The job "job_fact_measure_hortus" is responsabile for the loading of data coming from two different "hortus" source stations, whose measures are periodically (every 15 minutes) "published" into the FTP remote site, in the form of flat file.
The "transformation & load" of the job simply works on every new, incoming file found in the "/var/staging/hortus/2load/" directory; once parsed, transformed and loaded, the file is moved into the "/var/staging/hortus/loaded/" directory. In this way, if we wanna a set of data files to be re-loaded, we simply move them back into the incoming staging directory "/var/staging/hortus/2load/" directory : a fast, easy and performant solution.
In this way, however, we have a data file staging area "splitted" into two different directories: how can we keep it syncronized with the "hortus" source system? Unfortunately we cannot simply syncronize the remote FTP directory with the "/var/staging/hortus/2load/" directory: in this way we would download (and re-process) also the already-processed data files, since they have been moved into the "../loaded/" directory...
Once again, the easiest and simpliest way is to work at filesystem level. We therefore implement a subjob, part of the "job_fact_measure_hortus" job, which will be run before the transformation and load phase. Here is the complete picture of the subjob:
The logic is simple: we scan the remote source FTP directory, using the fFTPFileList Talend component, providing us an iterate link with a global map variable that indicates the current file name into the iteration.
If we wanna print the entire file path, we can simply use the tFTPFileList_1_CURRENT_FILEPATH global variable, instead.
If we want or need it, we are also free to specify a filemask in the tFTPFileList component, restricting with the list of file to work with, by specifying a matching file name pattern:
We can now compare the current (iteration) file name with the content of the "2load/" directory, using the tFileExist component:
The tFileExist component provides us a global varible of boolean type, "tFileExist_1_EXISTS". We use it inside an "if" link:
In this way, if the file exist the current subjob iteration simply aborts: we don't need to download it. In case the file does not exist:
The next step follows the same logic, but applied to the "loaded/" directory:
If we arrived to the last component, tFTPGet, it means we found no file in both the data staging directories, and therefore we need to download and make the current iteration file available inside the "2load/" directory:
This "two directory" staging filesystem logic can be applied in many different contexts, as it will be soon shown in a new post.
Most of the existing Data Warehousing solutions present a Data Staging Area, often implemented as a collection of flat files, stored at filesystem level. Data are first "brutally" downloaded by source systems and then stored into flat files, and only later processed by the ETL jobs.
The Data Staging area can be seen, therefore, as an intermediate storage area between the sources of information and the Data Warehouse itself, providing the following benefits:
- Reducing of the disk and network I/O: many ETL transformations involve several accesses and processing on the same source table or resource. Often those accesses are simple sequential reading operations and without any transformation, but they are performed several times on the same resource and they deal with huge amount of data. If we store this "raw" data into the filesystem once, we can directly load once them into the ETL server memory, and access the data as many times we want, without overloading the network;
- Improving reliability: as previously said, in this initial phase of ETL we deal with really huge and "raw" amount of data. All the following and complex needed transformations can then occur, without interfering with the initial "brutal" load operation. In case of I/O failure while storing the data staging files, it's much easier to restart the whole, unique downloading process without the need of restarting each, full job;
- Easier historization: we can store a sequential amount of data (example: list of customers from SAP) and perform a fast and simple comparing-check between the staging data file and our data mart relational table, in order to find changes against current DWH values. This has sense for both fact and dimension tables;
- More flexibility: many production ETL or Data Integration tools lack the presence of components for specific, often legacy source systems. We can therefore use our beloved Talend in order to build small "patch" jobs between the legacy system and the filesystem staging area, that will be then easly accessed by our production ETL solution. In this case, I suggest to store the "patch" File Staging data directly into the standard, CSV format, which is always and surely supported. By the way, Talend itself internally uses the CSV as data flow format between the components.
In our scenario, taken by the environmental data monitoring project of my university, we have a collection of different source environmental data measuring stations, each of one generates a series of data files in different formats, containing all the environmental measures, for each sensor and with the corresponding timestamp.
One of the source systems is the meterological FTP site of the Hortus Ltd. The job "job_fact_measure_hortus" is responsabile for the loading of data coming from two different "hortus" source stations, whose measures are periodically (every 15 minutes) "published" into the FTP remote site, in the form of flat file.
![]() |
Complete Data Flow of the entire environmental system. |
The "transformation & load" of the job simply works on every new, incoming file found in the "/var/staging/hortus/2load/" directory; once parsed, transformed and loaded, the file is moved into the "/var/staging/hortus/loaded/" directory. In this way, if we wanna a set of data files to be re-loaded, we simply move them back into the incoming staging directory "/var/staging/hortus/2load/" directory : a fast, easy and performant solution.
In this way, however, we have a data file staging area "splitted" into two different directories: how can we keep it syncronized with the "hortus" source system? Unfortunately we cannot simply syncronize the remote FTP directory with the "/var/staging/hortus/2load/" directory: in this way we would download (and re-process) also the already-processed data files, since they have been moved into the "../loaded/" directory...
Once again, the easiest and simpliest way is to work at filesystem level. We therefore implement a subjob, part of the "job_fact_measure_hortus" job, which will be run before the transformation and load phase. Here is the complete picture of the subjob:
![]() |
The complete file staging area synchronizing subjob. |
The logic is simple: we scan the remote source FTP directory, using the fFTPFileList Talend component, providing us an iterate link with a global map variable that indicates the current file name into the iteration.
For each file in the remote FTP source, we check (by comparing the file names) the "2load/" directory: if the data file is already present, it means we already downloaded it (but still not parsed, transformed and loaded into the DWH); in case we abort the current iteration and we pass to the next one. If the file is not present, we have two possibilities:
- The data file has already been downloaded, parsed, transformed and loaded into the DWH: in this case, we will find it into the "loaded/" directory;
- The data file wasn't downloaded.
Be begin therefore by scanning our remote FTP. For debugging reasons, it's useful to print the file name into a tJava component:
System.out.println(globalMap.get("tFTPFileList_1_CURRENT_FILE"))
If we wanna print the entire file path, we can simply use the tFTPFileList_1_CURRENT_FILEPATH global variable, instead.
If we want or need it, we are also free to specify a filemask in the tFTPFileList component, restricting with the list of file to work with, by specifying a matching file name pattern:
![]() |
Filemask in the tFTPFileList component. |
We can now compare the current (iteration) file name with the content of the "2load/" directory, using the tFileExist component:
![]() |
The tFileExist Talend component: first step. |
![]() |
Into the tFileExist Talend component, we use the current iteration file name available as global variable. |
The tFileExist component provides us a global varible of boolean type, "tFileExist_1_EXISTS". We use it inside an "if" link:
![]() | ||||||
The output "if" link of the first tFileExist component. |
![]() |
Using the global boolean variable provided by the first tFileExist component inside an "if" component: "no file exists" case. |
In this way, if the file exist the current subjob iteration simply aborts: we don't need to download it. In case the file does not exist:
![]() |
Using the global boolean variable provided by the first tFileExist component inside an "if" component: "file exists" case. |
The next step follows the same logic, but applied to the "loaded/" directory:
![]() |
Step 2: check into the "loaded" file staging directory. |
If we arrived to the last component, tFTPGet, it means we found no file in both the data staging directories, and therefore we need to download and make the current iteration file available inside the "2load/" directory:
This "two directory" staging filesystem logic can be applied in many different contexts, as it will be soon shown in a new post.
Labels:
data integration,
data warehousing,
etl,
patterns,
talend,
tricks
Thursday, November 17, 2011
Fact Table Surrogate Keys in Talend Open Studio
Surrogate keys are pretty common between Data Warehouse designers, and they are normally used to link facts to dimensional tables.
Surrogates keys define the dimensional tables primary key, and are always implemented as meaningless sequence integers.
The usage of surrogate keys provides the following benefits:
Time dimensional tables also make use of surrogate keys. Those surrogate keys are usually not completely "meaningless" (for example, the date_id often results of the concatenation of the year, month and day of month fields).
The logical structure of a datamart describes every fact table with a primary key as univoc composition of the dimension table surrogate foreign keys.
However, sometimes a fact table surrogate key may be helpful also at the physical level.
The advantages of implementing surrogate keys in the fact tables are the following:
However, using a surrogate key also in fact tables forbids the ETL process from the usage of any UPSERT logic ("Insert or Update" or "Update or Insert") at SQL level, since all upsert operations always require the presence of a primary key, defined on the natural columns that univocally define each table row.
ETL processes usually deal with huge set of data, UPSERT-like operations are rarely used. However, in many circumstances they can be required (ex. - in presence of late arriving facts).
Every ETL tool know this issue and offers a solution. In this post we explore the Talend capabilities of the tPostgresOutput component (the same logic can be used in all the components available for all the RDBMSs supported by Talend, as well).
Choosing the "Advanced Options" of the "Component" panel:
In this example, the fact table represents a series of measures caught from a environmental monitoring station. Each measure is univocally defined by the time of the measure itself (timestamp "ts") and an "id", which refers to the measuring station (SCD 2 dimension table, since the station could be moved, and in case the GPS would be modified as well).
We check the "ts" and "station_id" columns, the natural keys of our fact table, as "Update Key" and "Deletion Key", as we would manually writing the SQL statement.
All the other columns, including the dimensions whose combination don't univocally identify the fact row (and not only the measure columns), have to be checked as "Updatable" and "Insertable".
Talend Open Studio will now implement our UPSERT logic at application level. Different and more performant logics which avoid updates or sequential operations, can (and should) be used in parallel to this technique.
The usage of surrogate keys provides the following benefits:
- Support for type 2 slowly changing dimensions attributes: this allows unlimited history preservation of records;
- Improved partitioning, indexing and query performance (most of the RDBMS engines tuning options are much more efficient, if in presence of pure-integer primary keys);
- Indipendence from the natural keys of the source systems, and therefore "protection" for istance in case of unpredictable changes of the data sources themselves.
Time dimensional tables also make use of surrogate keys. Those surrogate keys are usually not completely "meaningless" (for example, the date_id often results of the concatenation of the year, month and day of month fields).
![]() |
A traditional star schema. The primary key is composed by the combination of a subset of the foreign surrogate keys, pointing to the dimensions. |
The advantages of implementing surrogate keys in the fact tables are the following:
- You cannot predict how your system will evolve in the future. Choosing a surrogate meaningless integer as primary key, will let you -as designer- to change the structure of the fact table simply adding or removing columns, and with a few updates at the ETL system logic level;
- Improved load performance, since composite primary keys are heavy to manage for the RDDMS engine;
- Improved recovery capability: in case of faiulure of the ETL loading process is easier to determine exactly where a load job was suspended, and to resume loading or back put the job entirely;
- Improved audit capabilities: since the audit foreign key is not part of any combined primary key, updates on the audit key can be easly performed without any consequence.
However, using a surrogate key also in fact tables forbids the ETL process from the usage of any UPSERT logic ("Insert or Update" or "Update or Insert") at SQL level, since all upsert operations always require the presence of a primary key, defined on the natural columns that univocally define each table row.
ETL processes usually deal with huge set of data, UPSERT-like operations are rarely used. However, in many circumstances they can be required (ex. - in presence of late arriving facts).
Every ETL tool know this issue and offers a solution. In this post we explore the Talend capabilities of the tPostgresOutput component (the same logic can be used in all the components available for all the RDBMSs supported by Talend, as well).
The Talend tPostgresqlOutput component. |
Choosing the "Advanced Options" of the "Component" panel:
In this example, the fact table represents a series of measures caught from a environmental monitoring station. Each measure is univocally defined by the time of the measure itself (timestamp "ts") and an "id", which refers to the measuring station (SCD 2 dimension table, since the station could be moved, and in case the GPS would be modified as well).
We check the "ts" and "station_id" columns, the natural keys of our fact table, as "Update Key" and "Deletion Key", as we would manually writing the SQL statement.
All the other columns, including the dimensions whose combination don't univocally identify the fact row (and not only the measure columns), have to be checked as "Updatable" and "Insertable".
Talend Open Studio will now implement our UPSERT logic at application level. Different and more performant logics which avoid updates or sequential operations, can (and should) be used in parallel to this technique.
Monday, November 14, 2011
Capturing the most recent file in Talend Open Studio
All modern RDBMSs offer the possibilty to limit the row result number of a query, based on a choosen sort criteria. For instance,
in PostgreSQL we can use the following standard ANSI syntax:
in order to obtain only the most recent row (according to
the timestamp „ts“).
Similarly, in Oracle we would have written something like:
Note: the tFileList runs directly over the filesystem, and therefore offers a really restric set of ordering options. In case we need a custom sort, we should complement our tFileList using a tSortRow component or a tJavaFlex with custom code.
We must restrict the list of files to only the most recent one. To do it, we need to use a tFlowToIterate component in order to convert the iterate link into a data flow. We use "FileName" and "FilePath" as column names for the file name and complete filesystem path, respectively.
We can finally "cut" our file list, using the tSampleRow component, specifying "1" as Range number. Since we ordered the file list (currently existing as data flow link) in descending file timestamp, assure that the only remaining row will correspond to the most recent file.
Converting again the data flow link into an iterate link, we can use the file name and file pah variables in other components. For istance, we could specify this selected "last recent file name" into the "exclude filemask" option subsequent fFileList component, in case we want to obtain the list of all files in the same directory, except the most recent one.
SELECT * FROM sample_table ORDER BY ts DESC LIMIT 1
Similarly, in Oracle we would have written something like:
SELECT * FROM sample_table WHERE ROWNUM=1 ORDER BY TS DESC
Is it possible to implement the same „sampling“ into a
Talend Open Studio data flow, using the standard components provided by the IDE?
Obviously, yes.
Let´s see how.
In the following example job we begin with tFileList component, scanning the content of a given directory by the last modified date, in descending order (i.e. we begin from the most recent up to the oldest file), providing as output an iterate link. Other combinations are of course possible (by file name in ascending order, file size, etc...).
Let´s see how.
In the following example job we begin with tFileList component, scanning the content of a given directory by the last modified date, in descending order (i.e. we begin from the most recent up to the oldest file), providing as output an iterate link. Other combinations are of course possible (by file name in ascending order, file size, etc...).
![]() |
In the tFileList component is also possible to specify a filemask (in the Java Regular Expression notation), in order to restrict the list of files. |
Note: the tFileList runs directly over the filesystem, and therefore offers a really restric set of ordering options. In case we need a custom sort, we should complement our tFileList using a tSortRow component or a tJavaFlex with custom code.
![]() |
tFlowToIterate. |
We must restrict the list of files to only the most recent one. To do it, we need to use a tFlowToIterate component in order to convert the iterate link into a data flow. We use "FileName" and "FilePath" as column names for the file name and complete filesystem path, respectively.
![]() |
tIterateToFlow. |
We can finally "cut" our file list, using the tSampleRow component, specifying "1" as Range number. Since we ordered the file list (currently existing as data flow link) in descending file timestamp, assure that the only remaining row will correspond to the most recent file.
![]() |
tSampleRow |
Converting again the data flow link into an iterate link, we can use the file name and file pah variables in other components. For istance, we could specify this selected "last recent file name" into the "exclude filemask" option subsequent fFileList component, in case we want to obtain the list of all files in the same directory, except the most recent one.
![]() |
tFlowToIterate. |
![]() |
Using a simple tJava, we print the selected file name to the standard output. Similarly, we could have used the file name global variable into another component, like for istance tFileDelete. |
Subscribe to:
Posts (Atom)