RayOnSpark¶
Ray is an open source distributed framework for emerging AI applications. With the RayOnSpark support packaged in BigDL Orca, Users can seamlessly integrate Ray applications into the big data processing pipeline on the underlying Big Data cluster (such as Hadoop/YARN or K8s).
Note: BigDL has been tested on Ray 1.9.2 and you are highly recommended to use this tested version.
1. Install¶
We recommend using conda to prepare the Python environment.
When installing bigdl-orca with pip, you can specify the extras key [ray]
to install the additional dependencies
for running Ray (i.e. ray==1.9.2
, psutil
, aiohttp==3.7.0
, aioredis==1.1.0
, setproctitle
, hiredis==1.1.0
, async-timeout==3.0.1
):
conda create -n py37 python=3.7 # "py37" is conda environment name, you can use any name you like.
conda activate py37
pip install bigdl-orca[ray]
View Python User Guide and Orca User Guide for more installation instructions.
2. Initialize¶
We recommend using init_orca_context
to initiate and run RayOnSpark on the underlying cluster. The Ray cluster would be launched by specifying init_ray_on_spark=True
. For example, to launch Spark and Ray on standard Hadoop/YARN clusters in YARN client mode:
from bigdl.orca import init_orca_context
sc = init_orca_context(cluster_mode="yarn-client", cores=4, memory="10g", num_nodes=2, init_ray_on_spark=True)
By default, the Ray cluster would be launched using Spark barrier execution mode, you can turn it off via the configurations of OrcaContext
:
from bigdl.orca import OrcaContext
OrcaContext.barrier_mode = False
View Orca Context for more details.
3. Run¶
After the initialization, you can directly run Ray applications on the underlying cluster. Ray tasks or actors would be launched across the cluster. The following code shows a simple example:
import ray @ray.remote class Counter(object): def __init__(self): self.n = 0 def increment(self): self.n += 1 return self.n counters = [Counter.remote() for i in range(5)] print(ray.get([c.increment.remote() for c in counters]))
You can retrieve the information of the Ray cluster via OrcaContext:
from bigdl.orca import OrcaContext ray_ctx = OrcaContext.get_ray_context() address_info = ray_ctx.address_info # The dictionary information of the ray cluster, including node_ip_address, object_store_address, webui_url, etc. redis_address = ray_ctx.redis_address # The redis address of the ray cluster.
You should call
stop_orca_context()
when your program finishes:from bigdl.orca import stop_orca_context stop_orca_context()
4. Known Issue¶
If you encounter the following error when launching Ray on the underlying cluster, especially when you are using a Spark standalone cluster:
This system supports the C.UTF-8 locale which is recommended. You might be able to resolve your issue by exporting the following environment variables:
export LC_ALL=C.UTF-8
export LANG=C.UTF-8
Add the environment variables when calling init_orca_context
would resolve the issue:
sc = init_orca_context(cluster_mode, init_ray_on_spark=True, env={"LANG": "C.UTF-8", "LC_ALL": "C.UTF-8"})