Sunday, July 8, 2018

Datastage execution process


IBM InfoSphere DataStage is a GUI based ETL tool which allows us to use familiar graphical point-and-click techniques to develop job flows for extracting, cleansing, transforming, integrating, and loading data into target files, target systems, or packaged applications. It abstracts the actual processing from the developer. This document throws light on what happens in the background of DataStage when an ETL job goes through different phases – design, compile and execution.
How DataStage Works?
DataStage falls under second generation ETL tools which have a provision of creating visual ETL task flows. A GUI-based ETL tool can leverage the productivity and quality of ETL development and maintenance. ETL job designed using GUI-based ETL tools will only give a “big picture” of the design. It is just a picture identifying the source(s) of data, the processing that the data are to undergo, the target(s) for the data and some relevant documentation.
The design paradigm of DataStage is that, ETL design should be a “big-picture”, so that even business users who have minimal or no technical knowledge can grasp the intent of the design. A simple requirement could be, reading from a text file, aggregating data based on grouping keys, and finally writing the results into another text file. When the ETL for this requirement is designed, it would show a picture showing the source component where the file is read, aggregation component where summarization is done, the target component where the final results are written to a file. But this is not all that will accomplish the task. To make it work, we need to go “under the covers” and fill in the details – the pathname of the source file, format, which columns are to be grouped and aggregated, the pathname of the target file, and so on.
Moreover, how DataStage works depends on the job type and the current phase of the job. DataStage supports different job types – Server Jobs, Parallel Jobs and Mainframe Jobs. Design-Time, Compile-Time and Run-Time are three different phases that any DataStage job goes through in its life cycle.
Let us get into details on what happens under the covers right from the beginning.
DataStage Client/Server Connectivity:
A simple DataStage infrastructure consists of DataStage server and a local area network on which one or more DataStage client machines may be connected. Connection from a DataStage client to a DataStage server is managed through a mechanism based upon the UNIX remote procedure call mechanism. DataStage uses a proprietary protocol called “DataStage RPC” which consists of an RPC daemon (dsrpcd) listening on TCP port number 31538 (default) for connection requests from DataStage clients.
But before dsrpcd gets involved, the connection request goes through an authentication process. Authentication is handled by the Information Server through its “login and security service”.

Each connection request from a DataStage client asks for connection to the dscs (DataStage Common Server) service. The dsrpcd (the DataStage RPC daemon) checks itsdsrpcservices file to determine whether there is an entry for that service and, if there is, checks whether the requesting machine’s IP address is authorized to request that service. If so then the executable associated with the dscs service (dsapi_server) is invoked.
DataStage Processes:
For each valid client connection, a “dsapi_server” process is invoked and this process manages traffic, inactivity timeout and also acts as an “agent” on the DataStage server for its own particular client connection. If the client requests access to the Repository, then the dsapi_server process will fork a child process called “dsapi_slave” to perform that work. Typically, therefore, one would expect to see one dsapi_server and one dsapi_slave process for each connected DataStage client. Processes may be viewed with the ps -ef command (UNIX) or with Windows Task Manager. To check for DataStage client connections use:
ps –ef | grep dsapi_slave or ps -ef | grep dscs
Every DataStage process attaches to a shared memory segment that contains lock tables and various other inter-process communication structures. Further each DataStage process is allocated its own private shared memory segment. Shared memory allocation may be viewed in three ways: using the ipcs command (UNIX) or the shrdump command (Windows) or use the analyze.shm command from an operating system command prompt to examine the disk and printer shared memory segments.
In UNIX, check datastage shared memory segments usingipcs – mop | grep ade and to remove them use ipcrm -m ID, where ID is the number from the second column of ipcs –mop
When there are no connected DataStage clients, dsrpcd may be the only DataStage process running on the DataStage server. However, there could be more running at times. This is because, the DataStage deadlock daemon (dsdlockd) wakes up periodically to check for deadlocks in the DataStage database and, to clean up locks held by defunct processes – usually improperly disconnected DataStage clients. JobMonApp is another process that runs at times. A Java application that captures “performance” data (row counts and times) from running DataStage jobs called Job Monitor runs as JobMonApp.
Let us now get into details regarding OSH (Orchestrate), Node Configuration to get better idea of Parallel jobs.
Orchestrate Shell Script
At Compile-Time, the Designer first validates all link requirements, mandatory stage options, transformer logic, etc. It then generates a framework representation of data flows and stages – OSH (Orchestrate Shell Script). For each stage in the job design, there is an Operator, and other relevant information for that operator (such as schema and options) within the script. For each Transformer stage in the design, transform code is generated, i.e., DataStage builds a
C++ Framework operator which is then compiled into C++ and then to corresponding native operators. So when a job contains a Transformer, it takes longer to compile (but not to run). This is why an ANSI-compliant C compiler is required for installations where parallel jobs will be designed. If Custom stages that use reusable BuildOp stages are included in the job design, the C++ code in those BuildOps must also be compiled; this can be done using the Designer GUI or command line.
When the Stages on the diagram are mapped to native Operators, it is not necessary for the mapping to be a one-on-one. Some stages are based on a range of related operators and which one is used depends on the setting of the stage’s properties. All of the stages can include common operators such as partition and sort depending on how they are used in a job. For example, when a Sequential stage is used as a source it is mapped to import operator, as data from the text file (ASCII) has to be imported into native DataStage format. When used as a target it is mapped to the export operator. The converse also holds true. That is different stages get mapped to a single operator. For example, Row Generator and Column Generator are both mapped to the generator operator.
Operators are the basic functional units of an Orchestrate application. An OSH operator is an instance of a C++ class of APT_Operator. Operators read records from input data sets, perform actions on the input records, and write results to output data sets. An operator may perform an action as simple as copying records from an input data set to an output data set without modification. Alternatively, an operator may modify a record by adding, removing, or modifying fields during execution.
Let us consider the below depicted job. The design has three stages connected. At compile time, design is primarily converted into an OSH (Orchestrate shell).

In the below depicted OSH, Comment blocks introduce each operator, the order of which is determined by the order stages were added to the canvas. However, the actual execution order of operators is dictated by input/output designators, not by placement on the diagram. OSH uses the familiar syntax of the UNIX shell, such as Operator name, schema, operator options (“-name value” format), input (indicated by n< where n is the input#), and output (indicated by the n> where n is the output #). For every operator, input and/or output data sets are numbered sequentially starting from zero. Virtual data sets (in memory native representation of data links) are generated to connect osh operators.

Node Configuration
In DataStage, the degree of parallelism can be changed dynamically by making changes to the file called Configuration file, which contains the number of nodes on which the job runs and the resources to be utilized for job. Configuration file defines how many nodes should be used to run the job in parallel, where to store the temporary data, where to store the intermediate datasets, which database specific nodes should be used for optimum resource allocation and performance of the data load job.
The important thing is that all you need to do to scale up your job for greater throughput is to specify a configuration file with additional processing nodes. No changes to job design are required. No recompilation is required. The configuration files (and the Conductor process) look after it all. To specify a different configuration file, change the value of the APT_CONFIG_FILE environment variable.
The name of the default configuration file is default.apt. Maximum flexibility is obtained by making this into a job parameter, so that any of the available configuration files can be selected when the job is run. Unless someone changes the default location, it is to be found in a directory called Configurations, in the DataStage Engine directory ($DSHOME) on the server machine. If this location cannot be found, then DataStage also searches in the etc directory in the $APT_ORCHHOME directory.
Let us now go through the life cycle of a Parallel Job.
Parallel Job Life Cycle
As said earlier, job type does matter in understanding DataStage execution process. This document is restricted to parallel job type. To understand how DataStage handles parallel jobs, we need to consider the architectural differences when designing, when compiling and when executing.
IBM InfoSphere DataStage Designer client is used to create DataStage jobs that are compiled into parallel job flows, and reusable components that execute on the parallel Information Server engine.
Design-Time Architecture
Job type doesn’t matter when designing, as the architecture for Parallel, Server and Mainframe jobs is same at Design-Time, because, the developer uses the same Datastage Designer client. Developer has the choice to choose the job type – a server job, a parallel job or a mainframe job that is to be worked upon.
Compile-Time Architecture
Compile-Time architecture of Parallel job is different from that of a Server job or a Mainframe job. When a parallel job is compiled, a directory RT_SCnn (where nn is the job number) gets created. This folder contains the actual Orchestrate script OshScript.osh generated for the job and the script OshExecuter.sh for invoking osh. OshExecuter.sh uses the environment variable APT_ORCHHOME to determine the location of the executable.
If viewing generated OSH is enabled in Administrator, it can be viewed from the Generated OSH tab in Job Properties, Job Run Log in Director. For passive stages, See OSH option is available in View Data dialog.
Run-Time Architecture
Parallel jobs are executed under control of DataStage Server runtime environment. In IBM Infosphere DataStage when we RUN a job from Director/Designer, or when we invoke it from the command line interface using dsjob command or invoke it using DSRunJob() function, the job doesn’t run immediately, in fact a request is issued to the Engine to invoke execution of the job. When a request to run a job is made, it is exactly that: a request. Similarly, when we try to stop a job, it is a STOP request; it causes a signal to be sent to the running process.
As request made is asynchronous by default, the requesting user does not retain any control of the request made and jobs do not run as child processes of the requesting user process. So whether that process can handle the signal governs whether the Stop/Start request is successful.
Once the START request is successfully made, processing of request depends on the job type.
Conductor Process:
When a run request is processed for a parallel job, the first process to start is the Conductor process. This runs an Orchestrate shell (osh) on the server machine. It picks up values of all environment variables that are required, one of which (APT_CONFIG_FILE) locates the configuration file for this run of the job. Location of the osh executable is supplied via the APT_ORCHHOME environment variable.
Conductor composes Job “score” using the OSH and configuration file. This is similar to SQL query optimization plan generated by an RDBMS. It identifies the degree of parallelism and node assignments for each operator, inserts sorts and partitioners as and when needed to ensure correct results, defines connection topology (virtual datasets) between adjacent operators, inserts buffer operators to prevent deadlocks in fork-joins, defines number of actual OS processes, where possible, optimize resource requirements by combining multiple operators within a single OS process, where possible, thereby improving performance.
Job Score is used to fork processes with communication channels for data, message, and control. To have insights on what happens in the background, set$APT_STARTUP_STATUS environment variable to show each step of job startup and $APT_PM_SHOW_PIDS to show process IDs in DataStage log. Only after the score is composed and all the processes are created, actually processing begins. Startup time in the below screenshots tells us the time taken to compose score and to fork all processes.

Job processing ends when the final operator processes the last row of data or when any of the operator encounters a fatal error or when the Job receives a halt (SIGINT) signal either because of DataStage Job Control or because of human intervention (STOP request issued from DataStage Director).
When $APT_DUMP_SCORE is set, Job Score is output to the job log. One can see 2 score dumps written in the job log for each job. Of them, first one is for license operator and the second entry is the real job score. Score dump event can be identified from the job log, by looking for “main program: This step …”.
In the below depicted Job Score, it can be seen that, score is divided into two sections – DataSets and Operators. Datasets represent the partitioning and collecting, while operators are the mapping of stages to nodes. Job Score yields very useful info like the number of operators, sequential/parallel processing that each operator does, stage combinations, run-time insertions of sort and buffer operators and the total number of actual processes.

Conductor is responsible for forking a “section leader” process on each processing node which runs osh on that node. Section leader processes communicate with the conductor process about the progress, which then consolidates all messages from various processing nodes, and writes them into the job log.
Once all the section leader processes indicate that processing on their processing node is complete, the conductor process performs a final cleanup.
Section Leader Process:
As said earlier, each processing node starts a “section leader” process which runs osh on that node. Its job is to coordinate processing on that particular node. It extracts its instructions from the osh script generated by compiling the parallel job.
The task of each section leader process is to fork instances of required player processes to execute each of the operators in the osh script. Schema information, (re-)partitioning information and other options are passed to each player process, stdout and stderr of which are redirected to the parent section leader process. Section leader processes communicate with the conductor process, primarily to pass back any messages to be logged, and to notify of completion and status.
Player process:
A player process executes operators from osh. It runs osh to invoke the method(s) exposed by an operator. An operator is an instance of a C++ operator inheriting from APT_Operator.
Player processes communicate with each other to facilitate data transfer. If the players are on the same machine (an SMP environment), then communication is done through shared memory. If the players are on separate machines (an MPP cluster environment) then communication is effected using TCP.
Every player will be able to communicate with every other player. There are separate communication channels (pathways) for control, messages, errors, and data. There wont be any data channel going through the section leader/conductor, as this would limit scalability. Data flows directly from upstream operator to downstream operator using APT_Communicator class. Conductor and Section Leader communicate only via Control Channel. Section Leader communicates with player processes in three different channels, one for Control, the other two for stdout and stderr.
Each player process may consume disk space from the resource disk allocated to its processing node and/or from the resource scratch disk allocated to its processing node in the configuration file for the job. For example, a Sort stage will spill to disk when its memory limit property is exceeded.
Conclusion:
Thus we saw the Client-Server connectivity in DataStage, the processing that occurs at design-time, at compile-time, and at run-time. We learnt how to read job log, OSH, job score and the whole orchestrate execution process.

No comments: