Testing in Airflow Part 1 — DAG Validation Tests, DAG Definition Tests and Unit Tests

Chandu Kavar
5 min readAug 18, 2018

Testing is an integral part of any software system to build confidence and increase the reliability of the system. Recently, I joined Grab and here at Grab, we are using Airflow to create and manage pipelines. But, we were facing issues with Airflow. I had a conversation with my engineering manager and discussed on how we could make Airflow reliable and testable.

Before Grab, I worked with ThoughtWorks. They are using TDD (Test Driven Development) methodology in almost all the projects. As a matter of fact, everyone is very cognizant about testing. This motivated me to explore testing in Airflow.

I spent a weekend Googling. But, I didn’t find any good articles on airflow test covering all the aspects, there are only a handful of articles available. I felt it’s worth to write and share a blog on airflow testing.

Before we start, I want you to understand two main terminologies of Airflow: DAGs ( Directed Acyclic Graph) and Operators.

Sample DAG with few operators

DAGs

In Airflow, a DAG– or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

For example, a simple DAG could consist of three tasks: A, B, and C. It could say that A has to run successfully before B can run, but C can run anytime. It could say that task A times out after 5 minutes, and B can be restarted up to 5 times in case it fails. It might also say that the workflow will run every night at 10pm, but shouldn’t start until a certain date.

In this way, a DAG describes how you want to carry out your workflow; but notice that we haven’t said anything about what we actually want to do! A, B, and C could be anything. Maybe A prepares data for B to analyze while C sends an email. Or perhaps A monitors your location so B can open your garage door while C turns on your house lights. The important thing is that the DAG isn’t concerned with what its constituent tasks do; its job is to make sure that whatever they do happens at the right time, or in the right order, or with the right handling of any unexpected issues.

(https://airflow.apache.org/concepts.html)

Operators

While DAGs describe how to run a workflow, Operators determine what actually gets done.

An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators. The DAG will make sure that operators run in the correct certain order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.

This is a subtle but very important point: in general, if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom.

Airflow provides operators for many common tasks, including:

  • BashOperator - executes a bash command
  • PythonOperator - calls an arbitrary Python function
  • EmailOperator - sends an email
  • HTTPOperator - sends an HTTP request
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command
  • Sensor - waits for a certain time, file, database row, S3 key, etc…

(https://airflow.apache.org/concepts.html)

Now, think about the testing in Airflow. What can we test in Airflow? According to me, there are five categories of tests in Airflow that you can write:

  1. DAG Validation Tests: To test the validity of the DAG, checking typos and cyclicity.
  2. DAG/Pipeline Definition Tests: To test the total number of tasks in the DAG, upstream and downstream dependencies of each task etc.
  3. Unit Tests: To test the logic of custom Operators, custom Sensor etc.
  4. Integration Tests: To test the communication between tasks. For example, task1 pass some information to task 2 using Xcoms.
  5. End to End Pipeline Tests: To test and verify the integration between each task. You can also assert the data on successful completion of the E2E pipeline.

4 and 5 are out of the scope for this blog. I am going to cover these tests in part2 of this blog.

Let’s assume that we have this “hello world” DAG and we want to test it.

Helloworld DAG

Rest of the blog I will explain each category of the test with example for this DAG.

DAG Validation Tests

DAG validation tests are common for all the DAGs in Airflow, hence we don’t need to write a separate test for each DAG. This test will check the correctness of each DAG. It will also check whether a graph contains cycle or not. Tests will fail even if we have a typo in any of the DAG. Moreover, if we want to enforce developers to add certain default arguments to each DAG, we can write a test around that as well. Here are few validation tests —

DAG validation tests

DAG/Pipeline Definition Tests

Pipeline definition tests are similar to snapshot testing. It doesn’t test any processing logic, only help us to verify the pipeline definition. It includes the total number of tasks in the pipeline, the nature of the tasks, upstream and downstream dependencies of each task. We need to write DAG definition tests for each DAG. Here are few pipeline definition tests —

DAG/Pipeline definition tests

Unit Tests:

In Airflow, there are many built-in operators and sensors. We can also add our custom operators and sensors. As a part of this tests, we can check the logic of our custom operators and sensors.

Operator Test

Let’s assume we have a multiplyby5 custom operator. It multiplies given value by five. Here is the code for this operator —

Multiply by five custom operator

Now, we want to verify the logic of this operator. Here is the unit test for this operator —

Unit test for custom operator

Sensor Test

Suppose we have a custom sensor called “hello world”. Here is the code for the “hello world” sensor —

Hello World Sensor

Now, let’s verify the logic of this sensor. Here is the unit test for this sensor—

Hello World Sensor Tests

To run Integration Tests and End to End Pipeline Test, we need to have a test environment. The test environment should be similar to the production environment but on a small scale. In this environment, we run all Airflow pipelines on sample data and assert the data for each pipeline. It will also help us to make sure that everything is working fine on an actual cluster as well.

In part2 of this blog, Sarang Shinde and I have explained Integration Tests and End to End Pipeline Tests with some examples.

Github repo is here for all the code available in this blog.

Thanks for reading. If you found this blog helpful, please recommend it and share it. Follow me for more articles on Big Data.

You can find other more interesting blogs here. Thanks!

Thanks to Shakti garg, Siddharth Kulkarni and Sunil Mundra.

Reference:

https://medium.com/wbaa/datas-inferno-7-circles-of-data-testing-hell-with-airflow-cef4adff58d8

📝 Read this story later in Journal.

👩‍💻 Wake up every Sunday morning to the week’s most noteworthy stories in Tech waiting in your inbox. Read the Noteworthy in Tech newsletter.

--

--