Skip to main content

PyFlink

基于Flink官方提供的Python API,开发人员可以在Flink上开发Python任务,更好地进行数据分析。

开发环境准备

在平台开发、运行PyFlink任务前提是要准备Python环境。可以在【项目管理-PyFlink环境管理】模块上传准备好的环境文件。下文将以Python3.8为例,为您介绍如何准备虚拟环境。

  • 生成环境包

    1. 在本地准备/build/setup-pyflink-virtual-env-linux.sh脚本,其内容如下:
    set -e
    # 下载Python 3.8 Miniconda3-py38_4.11.0-Linux-x86_64.sh 脚本。
    wget "https://repo.anaconda.com/miniconda/Miniconda3-py38_4.11.0-Linux-x86_64.sh" -O "Miniconda3-py38_4.11.0-Linux-x86_64.sh"

    # 为Python 3.8 miniconda.sh脚本添加执行权限。
    chmod +x Miniconda3-py38_4.11.0-Linux-x86_64.sh

    # 创建Python的虚拟环境。
    ./Miniconda3-py38_4.11.0-Linux-x86_64.sh -b -p linux_venv

    # 激活Conda Python虚拟环境。
    source linux_venv/bin/activate ""

    # 安装PyFlink依赖。
    # update the PyFlink version if needed
    pip install "apache-flink==1.12.5"

    # 关闭Conda Python虚拟环境。
    conda deactivate

    # 删除缓存的包。
    rm -rf linux_venv/pkgs

    # 将准备好的Conda Python虚拟环境打包。
    zip -r linux_venv.zip linux_venv
  1. 在本地准备 linux_build.sh 脚本,其内容如下:

    #!/bin/bash
    set -e -x
    yum install -y zip wget

    cd /root/
    bash /build/setup-pyflink-virtual-env-linux.sh
    mv linux_venv.zip /build/
  1. 在CMD命令行,执行如下命令:

    docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 ./linux_build.sh
  1. 执行完该命令后,会生成一个名字为venv.zip的文件,即为Python 3.8的虚拟环境。
  • 上传环境包
    1. 登录实时平台后,进入项目管理 - PyFlink环境管理 菜单。
    2. 点击上传文件,上传刚刚生成的环境包venv.zip文件。
      • 平台上传文件大小的限制为200 MB,而Python虚拟环境的大小通常会超过该限制。因此,您需要修改配置文件中的相关参数大小
      • 打开平台配置文件: vim .../DTFront/tengine/conf/nginx.conf
      • 修改client_max_body_size参数的大小
    3. 创建PyFlink任务时,选择venv.zip文件。
    4. 通过任务数量,可查看当前环境被使用的任务列表。 没有任务引用该环境时,可删除虚拟环境。

开发任务

目前平台支持两种方式开发PyFlink任务:

  • 脚本上传:您需要在线下完成Python API作业开发,上传至资源管理后,再提交作业到集群上运行。
  • Web编辑:支持您直接在Web端编辑和维护Python代码,方便协作开发。

image-20220704141823474

参数说明
操作模式资源上传:自己线下完成的Python API作业,必须为.py文件。
Web编辑:在实时开发平台的IDE中完成代码编辑。
Python环境选择在上一步中上传的Python环境
Python入参作业参数,在 python 里面可以通过 sys.argv[1] , sys.argv[2] 去使用。
附加依赖包如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,可以通过如下方式来指定Connector或者Java自定义函数的JAR包。
目前只支持传一个 jar 包, 如果要引用多个。需要自己打包多个 jar 为一个 jar,然后上传。这个 jar 会被添加到 classpath 里面去。
第三方Python包在python环境中未打包或者只是该任务需要使用的第三方Python包,可通过此处选择。

任务管理

同Flinkjar管理内容。

note

PyFlink任务必须在环境参数中维护两个python环境的路径,如下,具体路径请根据实际情况修改:

  • taskmanager 端的 python 环境路径

    python.executable=venv.zip/linux_venv/bin/python3
  • client 提交端的 python 环境路径

    python.client.executable=venv.zip/bin/python3