Tutorial on Luigi, part 3 pipeline: input() and output()

In the last article we saw some small example of a Luigi pipeline, in this article I want to explore how make the different Tasks to comunicate and pass information thus LocalTarget between them.

We already saw that we can use parameters to pass info from a Task to the next, and other nice way is to use the methods: input() and output().

The use of self.input()

Let’s see an example:

 
class PassPlotNameTask(luigi.Task):
    name      = luigi.Parameter(default= "simple_plot.png")
    directory = luigi.Parameter(default="{}/{}".format(os.getcwd(), 'folder'))

    def requires(self):
        return CreatePlotTask(name=self.name,
                              directory=self.directory)

    def output(self):
        return luigi.LocalTarget(self.name)

    
class CreatePlotTask(luigi.Task):
    name      = luigi.Parameter()
    directory = luigi.Parameter()

    def run(self):
        x = range(1, 10, 1)
        y = [i ** 2 for i in x]

        fig = plt.figure()
        ax = plt.subplot(111)

        ax.plot(x, y)
        # Here we replace os.getcwd() with self.input().path
        return fig.savefig("{}/{}".format(self.input().path,
                                          self.name)) 

    def output(self):
        return luigi.LocalTarget(self.name)

    def requires(self):
        return MakeDirectory(directory=self.directory)

    
class MakeDirectory(luigi.Task):
    directory = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(self.directory)
    def run(self):
        os.mkdir(self.directory)

The value for self.input() comes from the result of the method output() inside the Task called by requires() in this case it would be the method MakeDirectory().

Continue reading “Tutorial on Luigi, part 3 pipeline: input() and output()”

Tutorial on Luigi pipeline, part 2: Examples

After the introduction of the previous post, let’s now see an example that I code to better teach myself the use of Luigi’s pipeline.

A Task in Luigi

Here follows a simple Luigi Task:

# Let's import what we need:
import os
import luigi
import matplotlib.pyplot as plt

# The Task:
class CreatePlotTask(luigi.Task):
    # A parameters is equivalent to create a constructor for each Task.
    # We can intend it as declaring a 'variable' for our script.
    # I believe to be good practice to list the parameters before their use.
    # However, in this case it is not necessary.
    name = luigi.Parameter(default= "simple_plot.png") 

    def run(self):
        x = range(1, 10, 1)
        y = [i ** 2 for i in x]

        fig = plt.figure()
        ax = plt.subplot(111)
        ax.plot(x, y)

        return fig.savefig("{}/{}".format(os.getcwd(), self.name))
    
    def output(self):
        return luigi.LocalTarget(self.name) 
Continue reading “Tutorial on Luigi pipeline, part 2: Examples”

Tutorial on Luigi pipeline, part 1: Introduction

From the documentation page of Luigi (https://luigi.readthedocs.io/en/stable/index.html) I can summarise:

Luigi is a pipeline library designed completely in Python by Spotify to solve all pipeline problem associate with long-running batch process.

Structure

The structure of a pipeline in Luigi resamble that of graph, with nodes and edges connecting the nodes.

The “nodes” are called Task and the metodo def requires() provide the connection among the nodes.

If in a pipeline, I would consider to execute the tasks one-after-the-other untill the end, e.g.:

Start -> Task A -> Task B -> Task C -> End.

Continue reading “Tutorial on Luigi pipeline, part 1: Introduction”