As a data analyst, you may be required to send report as email at a regular basis. In R, you may use cronR to achieve this. It’s simple and easy to use. But today I will introduce Apache Airflow (written in python) to schedule R scripts as an alternative.
We will use Apache Airflow to schedule a task to start a container to run your R scripts. Here is the steps:
- Run Apache Airflow in docker
- Prepare your R docker image
- Write the dag file (Python) and R script
Run Apache Airflow in docker
Apache Airflow is written in Python, so it is better to know a little Python. We will run Apache Airflow in docker in linux. So you will need docker installed.
I will use my custom Apache Airflow. It’s a forked repository of puckel’s.
You should build your custom image too. Because you will need to change the dockerfile. You can check my dockerfile.
The key to allow Apache Airflow (which run as a container) to start a docker container is the following line
groupadd --gid 119 docker
I borrowed it from other’s dockerfile. Sorry, I didn’t remember the details. You should change the groud id of the docker file, it 119
in mine. Run id
command and you will get it (you may see something like 119(docker)
).
After buiding your image, pull it and run Apache Airflow
docker pull shizidushu/docker-airflow:1.10.2
Then you deploy Apache Airflow. I use docker swarm. Here is the yml file. I omit some details and you should edit it according to your own need. You may need to read Apache Airflow’s document.
version: "3.7"
services:
redis:
image: redis:5-alpine
ports:
- "6379"
networks:
- apache-airflow-overlay
command: redis-server
deploy:
replicas: 1
update_config:
parallelism: 2
delay: 10s
restart_policy:
condition: on-failure
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
- PGDATA=/var/lib/postgresql/data/pgdata
volumes:
- ../persistent-data/airflow/pgdata:/var/lib/postgresql/data
networks:
- apache-airflow-overlay
deploy:
placement:
constraints: [node.role == manager]
webserver:
image: shizidushu/docker-airflow:1.10.2
depends_on:
- postgres
- redis
networks:
- apache-airflow-overlay
environment:
- DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
- LOAD_EX=n
- AIRFLOW_HOME=/usr/local/airflow
- FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
- EXECUTOR=Celery
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /usr/bin/docker:/usr/bin/docker
- ./dags:/usr/local/airflow/dags
ports:
- target: 8080
published: 8080
mode: host
deploy:
placement:
constraints: [node.role == manager]
command: webserver
flower:
image: shizidushu/docker-airflow:1.10.2
depends_on:
- postgres
- redis
networks:
- apache-airflow-overlay
environment:
- DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
- LOAD_EX=n
- AIRFLOW_HOME=/usr/local/airflow
- FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
- EXECUTOR=Celery
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /usr/bin/docker:/usr/bin/docker
- ./dags:/usr/local/airflow/dags
ports:
- target: 5555
published: 5555
mode: host
deploy:
replicas: 1
restart_policy:
condition: on-failure
placement:
constraints: [node.role == manager]
command: flower
scheduler:
image: shizidushu/docker-airflow:1.10.2
depends_on:
- postgres
- redis
- webserver
networks:
- apache-airflow-overlay
environment:
- DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
- LOAD_EX=n
- AIRFLOW_HOME=/usr/local/airflow
- FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
- EXECUTOR=Celery
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /usr/bin/docker:/usr/bin/docker
- ./dags:/usr/local/airflow/dags
deploy:
replicas: 1
restart_policy:
condition: on-failure
placement:
constraints: [node.role == manager]
command: scheduler
worker:
image: shizidushu/docker-airflow:1.10.2
depends_on:
- postgres
- redis
- webserver
- scheduler
networks:
- apache-airflow-overlay
environment:
- DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
- LOAD_EX=n
- AIRFLOW_HOME=/usr/local/airflow
- FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
- EXECUTOR=Celery
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /usr/bin/docker:/usr/bin/docker
- ./dags:/usr/local/airflow/dags
deploy:
replicas: 2
restart_policy:
condition: on-failure
command: worker
networks:
apache-airflow-overlay:
driver: overlay
external: true
Notice I set an environment variable DOCKER_VOLUME_BASE
which I use it to refer the location of my r script. You can omit it if you prefer.
After deploy Apache Airflow, it is time to write the dag file which defines the schedule job. And you also need to prepare a R docker image to run your R script. I use a docker image based on rocker/r-ver.
Here is an example of Dag file.
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 1, 22, 1),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=15),
}
dag = DAG(
dag_id ='r-script',
default_args= default_args,
schedule_interval='5 1 * * *')
test = DockerOperator(
api_version='auto',
image='rocker/r-ver',
network_mode='bridge',
volumes=[os.path.join(os.environ['DOCKER_VOLUME_BASE'], 'input', 'rscript') + ':/root/rscript'],
command='Rscript script.R -d "{{ next_execution_date }}"',
task_id='run-r-script',
working_dir = '/root/rscript',
dag=dag)
The next_execution_date
is a parameter passed to R script. Please check Apache Airflow’s document for more.
Here is an example of R script:
library(optparse)
# parse arguments
## refer to https://www.r-bloggers.com/passing-arguments-to-an-r-script-from-command-lines/
option_list <- list(
make_option(opt_str = c("-d", "--date"), type="character", help="Execution date")
)
opt_parser = OptionParser(option_list = option_list)
opt = parse_args(opt_parser)
# check parameter
if (is.null(opt$date)) {
print_help(date)
stop("The next execution date must be provided!", call.=FALSE)
}
print(paste("The next execution date is", opt$date))