Welcome to pypipegraph’s documentation!

This document describes the current pipegraph api as you’d use it. Please see http://code.google.com/p/pypipegraph for the project’s homepage.



[pypipegraph](https://github.com/TyberiusPrime/pypipegraph): is an MIT-licensed library for constructing a workflow piece by piece and executing just the parts of it that need to be (re-)done. It supports using multiple cores (SMP) and (eventually, alpha code right now) machines (cluster) and is a hybrid between a dependency tracker (think ‘make’) and a cluster engine.

More specifically, you construct Jobs, which encapsulate output (i.e. stuff that needs to be done), invariants (which force re-evaluation of output jobs if they change), and stuff inbetween (e.g. load data from disk).

From your point of view, you create a pypipegraph, you create jobs, chain them together, then ask the pypipegraph to run. It examines all jobs for their need to run (either because the have not been finished, or because they have been invalidated), distributes them across multiple python instances, and get’s them executed in a sensible order.

It is robust against jobs dying for whatever reason (only the failed job and everything ‘downstream’ will be affected, independend jobs will continue running), allows you to resume at any point ‘in between’ jobs, and isolates jobs against each other.

pypipegraph supports Python 3.

30 second summary

output_filenameA = 'sampleA.txt'
def do_the_work():
    op = open(output_filename, 'wb').write("hello world")
jobA = pypipegraph.FileGeneratingJob(output_filenameA, do_the_work)
output_filenameB = 'sampleB.txt'
def do_the_work():
     op = open(output_filenameB, 'wb').write(open(output_filenameA, 'rb').read() + ",  once again")
jobB = pypipegraph.FileGeneratingJob(output_filenameB, do_the_work)
print 'the pipegraph is done and has returned control to you.'
print 'sampleA.txt contains "hello world"'
print 'sampleB.txt contains "hello world, once again"


All jobs have a unique name (job_id), and all but invariant preserving jobs encapsulate python callbacks to do their work.

There are four basic kinds of jobs:

  • Output generating jobs. These are generally run in a fork of the cs, they might use multi cores (if you call an external pnon-python program), and they modify the outside world (create files, change databases, call the web).

    They do not modify the state of your program - neither on the mcp nor on the cs level.

  • Invariant preserving jobs. These jobs are used to check that nothing changed. If it did, the downstream jobs need to be redone. Typical examples of what is checked are file modification time, file checksum, parameters, or changes to python functions.

    They are evaluated in the mcp and never ‘ran’ anywhere.

  • Compute slave modifying jobs. These jobs load data into a compute slave that is then available (read-only) in all dependend output generating jobs (and it even only needs memory once thanks to linux forking behaviour).

    They are not run if there isn’t an output job depending on them that needs to be done. They run single core per machine (since they must be run within the compute slave). They may run multiple times (on different machines). They modify compute slaves!

  • Job generating jobs. They extend the job graph itself - either by creating new jobs depending on the data at hand (for example, create a graph for every line in a file generated by an output job), or by injecting further dependencies.

    They are executed in a compute slave (since they might well use data that was loaded by cs modifying jobs) and the jobs generated are transfered back to the mcp (so the mcp is still isolated from their execution function). Note: Job generating jobs are run each time the pipeline runs - otherwise we could not check whether the generated jobs have been done. That probably also means that their dependencies should be ‘lightweight’ in terms of runtime.

In addition, there are compound jobs that combine jobs for convienance.

Invariant preserving jobs


Encapsulate a tuple defining some parameters to another job. If the parameters change, dependend jobs are redone.


Capture the file modification time of an input file.


Capture the file modification time, file size and the md5 sum of a file. If the size changes, invalidate dependands. If the time changes, check md5, if it changed, invalidate dependands. Otherwise, update internal file modification time.


Compare (slightly sanitized) byte code of arbitrary python functions to its last definition. If it changed: invalidate dependands.

Output generating jobs


This job creates an output file.

It get’s rerun if the output file does not exist, if it is 0 byte in size, or if the job has been invalidated. Not creating the output file in its callback raises an exception.

If the job raises an exception, the output file created so far will be removed. (optionally, if you pass rename_broken = True when creating, the output filename get’s renamed into outputfilename + ‘.broken’)

The job receives an implicit FunctionInvariant on its callback. This can be supressed by calling myjob.ignore_code_changes()

The jobid doubles up as the output filename.


Same as a FileGeneratingJob, just that multiple (defined) files are checked and produced.


This job creates a file that directly dependand jobs might access. After those are through though, the file get’s removed. It does not get removed if any directly dependand job dies (instead of finishing ok), so that it does not need to be redone later. It does get removed if the TempFileGeneratingJob breaks (except if you pass rename_broken = True when construcing it).

Compute slave modifying jobs


Load data (from wherever), and store it as attribute on an object. Only occurs if there is another Job depending on it that needs o be done. Might occur multiple times on different machines.

The attribute might disappear after all directly dependand jobs have been run (Todo: I’m not sure how smart this idea is. On the one hand, it allows us to clear some memory. On the other hand, it makes some things mighty cumbersome.)


Similar to an AttributeLoadingJob, but does not do anything about the data storage - that’s your job.

DataLoadingJobs don’t directly require their prerequisits to be done - so chaining a bunch of them, and having a FileGeneratingJob at either makes all of them run (given that the FileGeneratingJob is not done) or none of them.

On the plus side, the data loaded via this job does not get automatically eaten once all dependend jobs have been done. (Todo)

Job generating jobs


This job injects further dependencies into jobs that already depends on it.

An example: Say you want to extract the titles from a bunch of downloaded websites. But the websites are not defined when first creating jobs - you first have to download an overview page. So you write a job that takes all .html in a folder and combines their titles (Z). You also create a job that downloads the overview (A) . Then you write a DependencyInjectionJob C (upon wich Z depends on) that depends on A, creates a bunch of jobs (B1…Bn) that download the single websites, and you return them. Z now also depends on B1..Bn and since Z also depended on C, it could not have run before it had the additional dependencies injected.

It is an error to add dependencies on a job that is not dependand on this DependencyInjectionJob.

A DependencyInjectionJob has an implicit FunctionInvariant.


This job generates new jobs that depend on the output of an earlier one.

Example: You want to draw a graph for each of the member of your sales team. What members there are you of course only know after querying the database for them. You write a JobGeneratingJob that queries the database for a list of sales team members (or depends on some other job if the action is more expensive). For each sales team member it creates a FileGeneratingJob that queries the database for this members figures and draws the graph. The FileGeneratingJobs now can run in parallel even on different machines…

A JobGeneratingJob has an implicit FunctionInvariant.

Compound jobs


This job wraps plotting with pyggplot. It takes two functions: one calculates the dataframe for the plot, and that result is cached. The other one loads that dataframe and returns a pyggplot.Plot.

Both have their own FunctionInvariant, so you can fiddle with the plot function without having the calculation part rerun.

The calc function must return a pydataframe.DataFrame, the plot function a pyggplot.Plot


A CachedJob is a combination of of a AttributeLoadingJob and a FileGeneratingJob. It has a single callback, that returns some hard-to-compute value, which is pickled to a file (jobid doubles as cache file name). The AttributeLoadingJob loads the data in question if necessary.

The calc function does not get run if there are no dependencies.

It also has an implicit FunctionInvariant on it’s calc function (supress just like a FileGeneratingJob with ignore_code_changes())


pypipegraph has a small set of exceptions (all descending from PyPipelineGraphError). * RuntimeError get’s thrown by pypipegraph.run if a job raised an exception, communication lines were broken etc * JobContractError is stored in a job’s .exception if the job’s callback did not comply with it’s requirements (e.g. a FileGeneratingJob did not actually create the file) * CycleError: you have fabricated a cycle in your dependencies. Unfortunatly it’s currently not reported where the cycle is (though some simple circles are reported early on)


While the pypipegraph is running, you can terminate it with CTRL-C, and query it about which jobs are running by pressing Enter (that might take up to 5 seconds though, which incidentially is the ‘check on slaves’ timeout)

Executing structure

You write a ‘master control program’ (mcp) that creates Jobs and at one point, you hand over control to the pypipegraph. The mcp then talks to a resource-coordinator (either a local instance that says ‘take all of this machine’ or a network service that coordinates between multiple unning pypipegraphs) and spawns one compute slave (cs) for each machine.

Now each compute slave receives a copy of all jobs (which are just definitions, and therefore pretty small). One by one the mcp (talking to the resource-coordinator) asks the cs to execute jobs (while talking to the resource-coordinater to share resources with others), collects their feedback, prunes the graph on errors and returns control to you once all of them have been done (or failed ;) ).

The mcp knows (thanks to the resource coordinator) about the resources available (number of cpu cores, memory) and doesn’t overload the nodes (by spawning more processes than there are cores or by spawning too many memory hungry jobs at once).

Generated Files

Besides your output files, a pipegraph creates some auxillary files: * ./.pypipegraph_status_robust - stores the invariant data of all jobs * ./logs/ppg_run.txt - the chattery debug output of every decision the pipegraph makes (only if logs exists). All logging is also send to localhost 5005, and you can listen with util/log_listener.py * ./logs/ppg_errors.txt - a log of all failed jobs and their exception/stdout/stderr (only if logs exists and the file is writable) * ./logs/ppg_graph.txt - a dump of the connected graph structure (which job depends on which) (only if logs exists)


  • A pipegraph and it’s jobs can only be run once (but you can create multiple pipegraphs serially).
  • It is an error to create jobs before new_pipegraph() has been called.
  • Jobs magically associated with the currently existing pipegraph.
  • Invariant status is kept in a magic .pypipegraph_status file.
  • Jobs are singletonized on their id (within the existance of one pipegraph). Little harm is done in defining a job multiple times.
  • Adding jobs gives you an iterable of jobs (which depends_on also takes). Adding a job and an iterable also gives you an iterable. So does adding an iterable and a job or an iterable and an iterable…
  • Executing jobs (all `Output jobs`_) have resource attributes: cores_needed (default 1, -1 means ‘all you can get’), memory_needed (default = -1, means don’t worry about it, just start one per core, assume memory / cores. If you specify something above memory/core it’s treated as if you need (your_memory_specification / (memory/core)) cores). memory_needed is in bytes!
  • Beware of passing instance functions to FunctionInvariants - if the job creation code is done again for a different instance, it will raise an exception, because the bound function from before is not the same function you pass in now. Pass in class.function instead of self.function
  • pypipegraph is developed and tested on Ubuntu. It will not work reasonably on Windows - it’s job model makes heavy use of fork() and windows process creating does not implicitly copy-on-write the current process’ memory contents.

Python function gotchas

Please keep in mind that in python functions by default bind to the name of variables in their scope, no to their values. This means that

for filename in ('A', 'B', 'C'):
   def shu():
       write_to_file(filename=filename, text='hello world')
   job = pypipegraph.FileGeneratingJob(i, shu)

will not do what you want - you’ll end up with three jobs, all writing to the same file (and the appropriate JobContractExceptions because two of them did not create their output files). What you need to do is rebind the variable:

for filename in ('A', 'B', 'C'):
   def shu(filename=filename):  #that's the magic line. Also works for lambdas
       write_to_file(filename=filename, text='hello world')
   job = pypipegraph.FileGeneratingJob(i, shu)

Development notes

  • We use nosetest for testing (nosetests test_pypipegraph.py), and create a subdirectory for each test to isolate test cases.
  • There usually are some test cases not yet implemented. These are expected to raise NotImplementedError()s.

Indices and tables