使用 Docker 和 docker-compose 创建 Spark 独立集群(2021 年更新)
要求
项目结构
图像的创建
组合文件
演示应用程序
结论
还有什么要做的?
早在 2018 年,我就写了一篇关于如何使用 Docker 和 Docker Compose 创建 Spark 集群的文章。从那时起,我的这个小小的仓库就获得了 270 多个 star,以及来自社区的大量 fork 和活跃度。然而,我后来放弃了这个项目一段时间(2019 年我忙于一份新工作和其他一些事情),我偶尔会合并一些 pull 请求,但从未过多关注版本升级。
但今天我们将对这个老家伙进行一些更新,并希望能够用 Scala 和 Python 运行一些示例(是的,2018 版本不支持 Python,感谢社区将 PySpark 引入到这个项目中)。
要求
- Docker(我使用的是 20.10.7 版本)
- docker-compose(我使用的是 1.21.2 版本)
- 这个仓库 ;)
项目结构
将采用以下项目结构。
|
|--|apps # Apps directory for volume mounts(any app you want to deploy just paste it here)
|--|data # Data directory for volume mounts(any file you want to process just paste it here)
|--|Dockerfile #Dockerfile used to build spark image
|--|start-spark.sh # startup script used to run different spark workloads
|--|docker-compose.yml # the compose file
图像的创建
在 2018 版本中,我们使用了一个基础镜像,并为每个 Spark 工作负载分别创建了一个镜像(主节点一个镜像,工作节点一个镜像,spark-submit 一个镜像)。而在新方案中,我们将使用 Docker 多阶段构建来创建一个可以作为任何所需工作负载启动的唯一镜像。
以下是用于定义 apache-spark 镜像的 Dockerfile:
# builder step used to download and configure spark environment
FROM openjdk:11.0.11-jre-slim-buster as builder
# Add Dependencies for PySpark
RUN apt-get update && apt-get install -y curl vim wget software-properties-common ssh net-tools ca-certificates python3 python3-pip python3-numpy python3-matplotlib python3-scipy python3-pandas python3-simpy
RUN update-alternatives --install "/usr/bin/python" "python" "$(which python3)" 1
# Fix the value of PYTHONHASHSEED
# Note: this is needed when you use Python 3.3 or greater
ENV SPARK_VERSION=3.0.2 \
HADOOP_VERSION=3.2 \
SPARK_HOME=/opt/spark \
PYTHONHASHSEED=1
# Download and uncompress spark from the apache archive
RUN wget --no-verbose -O apache-spark.tgz "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" \
&& mkdir -p /opt/spark \
&& tar -xf apache-spark.tgz -C /opt/spark --strip-components=1 \
&& rm apache-spark.tgz
# Apache spark environment
FROM builder as apache-spark
WORKDIR /opt/spark
ENV SPARK_MASTER_PORT=7077 \
SPARK_MASTER_WEBUI_PORT=8080 \
SPARK_LOG_DIR=/opt/spark/logs \
SPARK_MASTER_LOG=/opt/spark/logs/spark-master.out \
SPARK_WORKER_LOG=/opt/spark/logs/spark-worker.out \
SPARK_WORKER_WEBUI_PORT=8080 \
SPARK_WORKER_PORT=7000 \
SPARK_MASTER="spark://spark-master:7077" \
SPARK_WORKLOAD="master"
EXPOSE 8080 7077 6066
RUN mkdir -p $SPARK_LOG_DIR && \
touch $SPARK_MASTER_LOG && \
touch $SPARK_WORKER_LOG && \
ln -sf /dev/stdout $SPARK_MASTER_LOG && \
ln -sf /dev/stdout $SPARK_WORKER_LOG
COPY start-spark.sh /
CMD ["/bin/bash", "/start-spark.sh"]
请注意,在 dockerfile 中,我们引用了一个名为start-spark.sh的脚本,它的主要目标是使用给定的角色(master 或 worker)运行 spark-class 脚本。
#start-spark.sh
#!/bin/bash
. "/opt/spark/bin/load-spark-env.sh"
# When the spark work_load is master run class org.apache.spark.deploy.master.Master
if [ "$SPARK_WORKLOAD" == "master" ];
then
export SPARK_MASTER_HOST=`hostname`
cd /opt/spark/bin && ./spark-class org.apache.spark.deploy.master.Master --ip $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT >> $SPARK_MASTER_LOG
elif [ "$SPARK_WORKLOAD" == "worker" ];
then
# When the spark work_load is worker run class org.apache.spark.deploy.master.Worker
cd /opt/spark/bin && ./spark-class org.apache.spark.deploy.worker.Worker --webui-port $SPARK_WORKER_WEBUI_PORT $SPARK_MASTER >> $SPARK_WORKER_LOG
elif [ "$SPARK_WORKLOAD" == "submit" ];
then
echo "SPARK SUBMIT"
else
echo "Undefined Workload Type $SPARK_WORKLOAD, must specify: master, worker, submit"
fi
要构建镜像,只需运行:
docker build -t cluster-apache-spark:3.0.2 .
过一段时间后,镜像将成功创建,具体时间取决于依赖项和 Spark tarball 的下载速度(幸运的是,由于多阶段设置,这些步骤会被缓存为一个层)。
组合文件
现在我们已经有了 apache-spark 镜像,接下来需要在 docker-compose 中创建集群。
version: "3.3"
services:
spark-master:
image: cluster-apache-spark:3.0.2
ports:
- "9090:8080"
- "7077:7077"
volumes:
- ./apps:/opt/spark-apps
- ./data:/opt/spark-data
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_WORKLOAD=master
spark-worker-a:
image: cluster-apache-spark:3.0.2
ports:
- "9091:8080"
- "7000:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-a
volumes:
- ./apps:/opt/spark-apps
- ./data:/opt/spark-data
spark-worker-b:
image: cluster-apache-spark:3.0.2
ports:
- "9092:8080"
- "7001:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-b
volumes:
- ./apps:/opt/spark-apps
- ./data:/opt/spark-data
demo-database:
image: postgres:11.7-alpine
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=casa1234
我们为 Spark master 和 worker 都配置了以下环境变量:
| 环境 | 描述 |
|---|---|
| SPARK_MASTER | Spark 主服务器 URL |
| SPARK_WORKER_CORES | 分配给工作线程的 CPU 核心数 |
| SPARK_WORKER_MEMORY | 分配给工作进程的内存量 |
| SPARK_DRIVER_MEMORY | 分配给驱动程序的内存量 |
| SPARK_EXECUTOR_MEMORY | 分配给执行程序的内存量 |
| SPARK_WORKLOAD | 要运行的 Spark 工作负载(可以是master、worker或submit 中的任何一个) |
与 2018 版本相比,进行了以下更改:
-
已移除自定义网络和 IP 地址
-
只开放 2 个工作进程而不是 3 个,并将每个工作进程的端口开放在 (9090...9091 等) 范围内。
-
感谢社区的贡献,PySpark 得到了支持
-
包含一个 PostgreSQL 实例来运行演示程序(两个演示程序都使用 JDBC 存储数据)。
创建测试集群的最后一步是运行 compose 文件:
docker-compose up -d
要验证您的集群,只需访问每个工作节点和主节点的 Spark UI URL 即可。
Spark Master:http://localhost:9090
Spark Worker 1: http://localhost:9091
Spark Worker 2:http://localhost:9092
数据库服务器
要检查数据库服务器,只需使用 psql 命令(或您选择的任何数据库客户端):
psql -U postgres -h 0.0.0.0 -p 5432
#It will ask for your password defined in the compose file
演示应用程序
以下应用程序可在应用程序目录中找到,这些应用程序用作我们集群行为的概念验证。
纽约公交车站数据 [PySpark]
该程序从MTA 公交时刻表加载存档数据,并使用 Spark SQL 应用基本筛选器,结果将持久化到 PostgreSQL 表中。
加载后的表格将包含以下结构:
| 纬度 | 经度 | 接收时间 | 车辆ID | 沿途距离 | 推断方向 ID | 推断阶段 | 推断路由 ID | 推断行程 ID | 下一个预定停靠点距离 | 下一个预定停靠站 ID | 报告小时 | 报告日期 |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 40.668602 | -73.986697 | 2014-08-01 04:00:01 | 469 | 4135.34710710144 | 1 | 进行中 | MTA NYCT_B63 | MTA NYCT_JG_C4-工作日-141500_B63_123 | 2.63183804205619 | MTA_305423 | 2014-08-01 04:00:00 | 2014年8月1日 |
要提交应用程序,请连接到其中一个工作节点或主节点并执行以下操作:
/opt/spark/bin/spark-submit --master spark://spark-master:7077 \
--jars /opt/spark-apps/postgresql-42.2.22.jar \
--driver-memory 1G \
--executor-memory 1G \
/opt/spark-apps/main.py
MTA 公交分析[Scala]
该程序从MTA Bus Time获取存档数据并对其进行一些聚合,计算结果将持久化到 postgresql 表中。
每个持久化表都对应一个特定的聚合:
| 桌子 | 聚合 |
|---|---|
| 每日概要 | 车辆报告汇总,包括停靠站点、平均速度和行驶距离(所有车辆) |
| 超速 | 超速处罚以5分钟为时间窗口计算。 |
| 平均速度 | 车辆平均速度 |
| 行驶距离 | 车辆行驶总里程 |
要提交应用程序,请连接到其中一个工作节点或主节点并执行以下操作:
/opt/spark/bin/spark-submit --deploy-mode cluster \
--master spark://spark-master:7077 \
--total-executor-cores 1 \
--class mta.processing.MTAStatisticsApp \
--driver-memory 1G \
--executor-memory 1G \
--jars /opt/spark-apps/postgresql-42.2.22.jar \
--conf spark.driver.extraJavaOptions='-Dconfig-path=/opt/spark-apps/mta.conf' \
--conf spark.executor.extraJavaOptions='-Dconfig-path=/opt/spark-apps/mta.conf' \
/opt/spark-apps/mta-processing.jar
您会在 spark-ui 中看到驱动程序和执行程序正在运行(在 Scala 中,我们可以使用部署模式集群)。
结论
-
我们在 docker-compose 中创建了一个更简单的 spark 集群版本,该集群的主要目标是为您提供一个本地环境来测试 spark 应用程序的分布式特性,而无需将其部署到生产集群。
-
生成的图像并非设计成占用空间小(图像大小约为 1GB)。
-
只有当您想要在机器上的分布式环境中运行 Spark 应用程序时才需要此集群(不建议在生产环境中使用,请改用 Databricks 或 Kubernetes 设置)。
还有什么要做的?
-
目前要在部署模式集群中运行应用程序,需要通过spark.driver.port配置指定任意驱动程序端口(我必须修复一些网络和端口问题)。
-
start-spark.sh 中的 spark submit 条目尚未实现,演示中使用的 submit 可以从任何 worker 触发。




