PyFlink
基于Flink官方提供的Python API,开发人员可以在Flink上开发Python任务,更好地进行数据分析。
开发环境准备
在平台开发、运行PyFlink任务前提是要准备Python环境。可以在【项目管理-PyFlink环境管理】模块上传准备好的环境文件。下文将以Python3.8为例,为您介绍如何准备虚拟环境。
生成环境包
- 在本地准备/build/setup-pyflink-virtual-env-linux.sh脚本,其内容如下:
set -e
# 下载Python 3.8 Miniconda3-py38_4.11.0-Linux-x86_64.sh 脚本。
wget "https://repo.anaconda.com/miniconda/Miniconda3-py38_4.11.0-Linux-x86_64.sh" -O "Miniconda3-py38_4.11.0-Linux-x86_64.sh"
# 为Python 3.8 miniconda.sh脚本添加执行权限。
chmod +x Miniconda3-py38_4.11.0-Linux-x86_64.sh
# 创建Python的虚拟环境。
./Miniconda3-py38_4.11.0-Linux-x86_64.sh -b -p linux_venv
# 激活Conda Python虚拟环境。
source linux_venv/bin/activate ""
# 安装PyFlink依赖。
# update the PyFlink version if needed
pip install "apache-flink==1.12.5"
# 关闭Conda Python虚拟环境。
conda deactivate
# 删除缓存的包。
rm -rf linux_venv/pkgs
# 将准备好的Conda Python虚拟环境打包。
zip -r linux_venv.zip linux_venv
在本地准备 linux_build.sh 脚本,其内容如下:
#!/bin/bash
set -e -x
yum install -y zip wget
cd /root/
bash /build/setup-pyflink-virtual-env-linux.sh
mv linux_venv.zip /build/
在CMD命令行,执行如下命令:
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./linux_build.sh
- 执行完该命令后,会生成一个名字为venv.zip的文件,即为Python 3.8的虚拟环境。
- 上传环境包
- 登录实时平台后,进入项目管理 - PyFlink环境管理 菜单。
- 点击上传文件,上传刚刚生成的环境包venv.zip文件。
- 平台上传文件大小的限制为200 MB,而Python虚拟环境的大小通常会超过该限制。因此,您需要修改配置文件中的相关参数大小
- 打开平台配置文件: vim .../DTFront/tengine/conf/nginx.conf
- 修改client_max_body_size参数的大小
- 在创建PyFlink任务时,选择venv.zip文件。
- 通过任务数量,可查看当前环境被使用的任务列表。 没有任务引用该环境时,可删除虚拟环境。
开发任务
目前平台支持两种方式开发PyFlink任务:
- 脚本上传:您需要在线下完成Python API作业开发,上传至资源管理后,再提交作业到集群上运行。
- Web编辑:支持您直接在Web端编辑和维护Python代码,方便协作开发。
参数 | 说明 |
---|---|
操作模式 | 资源上传:自己线下完成的Python API作业,必须为.py文件。 Web编辑:在实时开发平台的IDE中完成代码编辑。 |
Python环境 | 选择在上一步中上传的Python环境 |
Python入参 | 作业参数,在 python 里面可以通过 sys.argv[1] , sys.argv[2] 去使用。 |
附加依赖包 | 如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,可以通过如下方式来指定Connector或者Java自定义函数的JAR包。 目前只支持传一个 jar 包, 如果要引用多个。需要自己打包多个 jar 为一个 jar,然后上传。这个 jar 会被添加到 classpath 里面去。 |
第三方Python包 | 在python环境中未打包或者只是该任务需要使用的第三方Python包,可通过此处选择。 |
任务管理
同Flinkjar管理内容。
note
PyFlink任务必须在环境参数中维护两个python环境的路径,如下,具体路径请根据实际情况修改:
taskmanager 端的 python 环境路径
python.executable=venv.zip/linux_venv/bin/python3
client 提交端的 python 环境路径
python.client.executable=venv.zip/bin/python3