更新源
yum install https://centos6.iuscommunity.org/ius-release.rpm -y wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-6.repo yum makecache
安装 Python3.6
yum install python36u python36u-devel -y ln -s /usr/bin/python3.6 /bin/python3
安装 pip
yum install python36u-pip -y
ln -s /usr/bin/pip3.6 /bin/pip3
pip3 install --upgrade pip
安装 virtualenv
pip3 install virtualenv
安装 Airflow
-
创建目录
mkdir -p /opt/airflow mkdir -p /opt/airflow/airflow cd /opt/airflow/
-
激活环境 virtualenv -p `which python3.6` venv source venv/bin/activate 退出虚拟环境 deactivate
-
安装依赖包 pip3 install cryptography pip3 install flask-bcrypt pip3 install mysql-connector
-
安装 pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
export AIRFLOW_HOME=/opt/airflow/airflow export SLUGIFY_USES_TEXT_UNIDECODE=yes pip3 install apache-airflow==1.10.2 AIRFLOW_HOME=/opt/airflow/airflow SLUGIFY_USES_TEXT_UNIDECODE=yes pip3 install apache-airflow==1.10.2
配置
-
修改时区
vim $AIRFLOW_HOME/airflow.cfg default_timezone = Asia/Shanghai expose_config = True python -c "from airflow.utils import timezone;print(timezone.datetime(2019,7,1))"
-
配置MySQL数据库
-
安装包
sudo apt-get install libmysqlclient-dev pip3 install apache-airflow[mysql]
-
创建数据库
CREATE DATABASE airflow; ALTER DATABASE `airflow` CHARACTER SET utf8; CREATE USER 'airflow'@'%' IDENTIFIED BY 'af123854'; GRANT all privileges on airflow.* TO 'airflow'@'%' IDENTIFIED BY 'af123854'; FLUSH PRIVILEGES;
-
修改cfg
vim $AIRFLOW_HOME/airflow.cfg sql_alchemy_conn = mysql://airflow:[email protected]/airflow
-
-
配置 celery + rabbitmq
-
安装包
pip3 install apache-airflow[celery] pip3 install apache-airflow[rabbitmq]
-
创建用户相关信息
sudo rabbitmqctl add_user airflow af123854 sudo rabbitmqctl add_vhost airflow-rabbitmq sudo rabbitmqctl set_user_tags airflow airflow-rabbitmq sudo rabbitmqctl set_permissions -p monitoring airflow ".*" ".*" ".*"
-
修改cfg
vim $AIRFLOW_HOME/airflow.cfg executor = CeleryExecutor broker_url = amqp://airflow:[email protected]:5672/airflow-rabbitmq airflow 1.9.0以上使用的是celery4.x, 而celery 4.x使用json序列化,不是用pickle进行序列化,修改为: broker_url = pyamqp://airflow:[email protected]:5672/airflow-rabbitmq result_backend 推荐使用数据库,在MySQL那边创建 CREATE DATABASE result_backend; ALTER DATABASE `result_backend` CHARACTER SET utf8; GRANT all privileges on result_backend.* TO 'airflow'@'%' IDENTIFIED BY 'af123854'; FLUSH PRIVILEGES; vim $AIRFLOW_HOME/airflow.cfg result_backend = db+mysql://airflow:[email protected]/result_backend
-
-
commands
-
init
airflow initdb
-
webserver
启动: nohup airflow webserver -p 8080 > /opt/airflow/airflow/logs/webserver.log 2>&1 & 停止: ps aux | grep webserver | grep airflow | awk '{print $2}'| xargs -n 1 kill -9 访问 http://127.0.0.1:8080
-
scheduler
启动: nohup airflow scheduler > ./scheduler.out 2>&1 & 停止: ps aux | grep scheduler | grep -v failover | grep airflow | awk '{print $2}'| xargs -n 1 kill -9
-
flower
启动 nohup airflow flower > /opt/airflow/airflow/logs/flower.out 2>&1 & 停止 ps aux | grep flower | grep airflow | awk '{print $2}'| xargs -n 1 kill -9 访问 visit http://127.0.0.1:5555
-
worker
启动 export C_FORCE_ROOT=True nohup airflow worker > ./worker.out 2>&1 & 指定队列启动 nohup airflow worker -q testQueue > ./worker.out 2>&1 & 停止 ps aux | grep worker | grep celeryd | awk '{print $2}'| xargs -n 1 kill -9
-
其它
airflow list_dags airflow delete_dag tutorial airflow list_tasks tutorial airflow list_tasks tutorial --tree airflow test dag_name task_name test_time airflow run dagid [time] run task instance airflow backfill [dagid] -s[startTime] -e [endTime] run a backfill over 2 days
-