Azure Databricks cluster is a seto f computation resources and configurations on which you run data engineering, data science, and data analytics workloads, such as production ETL pipelines, streaming analytics, ad-hoc analytics, and machine learning.
You run these workloads as a set of commands in a notebook or as an automated job. Azure Databricks makes a distinction between all-purpose clusters and job clusters. You use all-purpose clusters to analyze data collaboratively using interactive notebooks. You use job clusters to run fast and robust automated jobs.
Work submitted to the cluster is split into as many independent jobs as needed. This is how work is distributed across the Cluster’s nodes. Jobs are further subdivided into tasks. The input to a job is partitioned into one or more partitions. These partitions are the unit of work for each slot. In between tasks, partitions may need to be reorganized and shared over the network.
The secret to Spark’s high performance is parallelism. Scaling vertically (by adding resources to a single computer) is limited to a finite amount of RAM, Threads and CPU speeds; but clusters scale horizontally, adding new nodes to the cluster as needed.
Spark parallelizes jobs at two levels:
The first level of parallelization is the executor - a Java virtual machine (JVM) running on a worker node, typically, one instance per node. The second level of parallelization is the slot - the number of which is determined by the number of cores and CPUs of each node. Each executor has multiple slots to which parallelized tasks can be assigned.
The JVM is naturally multi-threaded, but a single JVM, such as the one coordinating the work on the driver, has a finite upper limit. By splitting the work into tasks, the driver can assign units of work to *slots in the executors on worker nodes for parallel execution. Additionally, the driver determines how to partition the data so that it can be distributed for parallel processing. So, the driver assigns a partition of data to each task so that each task knows which piece of data it is to process. Once started, each task will fetch the partition of data assigned to it.
Delta Lake is an open-source storage layer that adds relational database semantics to Spark-based data lake processing.
Relational tables that support querying and data modification. With Delta Lake, you can store data in tables that support CRUD (create, read, update, and delete) operations. In other words, you can select, insert, update, and delete rows of data in the same way you would in a relational database system. Support for ACID transactions. Relational databases are designed to support transactional data modifications that provide atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can’t interfere with one another), and durability (when a transaction completes, the changes it made are persisted). Delta Lake brings this same transactional support to Spark by implementing a transaction log and enforcing serializable isolation for concurrent operations. Data versioning and time travel. Because all transactions are logged in the transaction log, you can track multiple versions of each table row, and even use the time travel feature to retrieve a previous version of a row in a query. Support for batch and streaming data. While most relational databases include tables that store static data, Spark includes native support for streaming data through the Spark Structured Streaming API. Delta Lake tables can be used as both sinks (destinations) and sources for streaming data. Standard formats and interoperability. The underlying data for Delta Lake tables is stored in Parquet format, which is commonly used in data lake ingestion pipelines.
You can also define Delta Lake tables as catalog tables in the Hive metastore for your Spark cluster, and work with them using SQL.
### External vs managed tbles
Tables in a Spark catalog, including Delta lake tables, can be managed or external.
# Save a dataframe as a managed table
df.write.format("delta").saveAsTable("MyManagedTable")
## specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
SQL Warehouses (formerly known as SQL Endpoints) provide a relational database interface for data in Azure Databricks. The data is stored in files that are abstracted by Delta tables in a hive metastore, but from the perspective of the user or client application, the SQL Warehouse behaves like a relational database.
Spark uses 3 different APIs: RDDs, DataFrames, and DataSets. The architectural foundation is the resilient distributed dataset (RDD). The DataFrame API was released as an abstraction on top of the RDD, followed later by the Dataset API.
DataFrames are the distributed collections of data, organized into rows and columns. Each column in a DataFrame has a name and an associated type.
MLLib old, uses purer RDD. now cool cats go with Spark ML that has DataFrames api. The most confusing part about MLLib versus Spark ML is that they are both the same library. The difference is that the “classic” MLLib namespace is org.apache.spark.mllib whereas the Spark ML namespace is org.apache.spark.ml. Whenever possible, use the Spark ML namespace when performing new data science activities.
Four componenets
MLflow Tracking allows data scientists to work with experiments. For each run in an experiment, a data scientist may log parameters, versions of libraries used, evaluation metrics, and generated output files when training machine learning models.
An MLflow Project is a way of packaging up code in a manner, which allows for consistent deployment and the ability to reproduce results. MLflow supports several environments for projects, including via Conda, Docker, and directly on a system.
MLflow offers a standardized format for packaging models for distribution. This standardized model format allows MLflow to work with models generated from several popular libraries, including scikit-learn, Keras, MLlib, ONNX, and more.
Each MLflow Model is a directory containing arbitrary files, together with an MLmodel file in the root of the directory that can define multiple flavors that the model can be viewed in.
Flavors are the key concept that makes MLflow Models powerful: they are a convention that deployment tools can use to understand the model, which makes it possible to write tools that work with models from any ML library without having to integrate each tool with each library. MLflow defines several “standard” flavors that all of its built-in deployment tools support, such as a “Python function” flavor that describes how to run the model as a Python function. However, libraries can also define and use other flavors. For example, MLflow’s mlflow.sklearn library allows loading models back as a scikit-learn Pipeline object for use in code that is aware of scikit-learn, or as a generic Python function for use in tools that just need to apply the model (for example, the mlflow deployments tool with the option -t sagemaker for deploying models to Amazon SageMaker).
All of the flavors that a particular model supports are defined in its MLmodel file in YAML format. For example, mlflow.sklearn outputs models as follows:
# Directory written by mlflow.sklearn.save_model(model, "my_model")
my_model/
├── MLmodel
├── model.pkl
├── conda.yaml
├── python_env.yaml
└── requirements.txt
And its MLmodel file describes two flavors:
time_created: 2018-05-25T17:28:53.35
flavors:
sklearn:
sklearn_version: 0.19.1
pickled_model: model.pkl
python_function:
loader_module: mlflow.sklearn
You can save and load MLflow Models in multiple ways. First, MLflow includes integrations with several common libraries. For example, mlflow.sklearn contains save_model, log_model, and load_model functions for scikit-learn models. Second, you can use the mlflow.models.Model class to create and write models. This class has four key functions:
add_flavor to add a flavor to the model. Each flavor has a string name and a dictionary of key-value attributes, where the values can be any object that can be serialized to YAML.
save to save the model to a local directory.
log to log the model as an artifact in the current run using MLflow Tracking.
load to load a model from a local directory or from an artifact in a previous run.
The MLflow Model Registry allows data scientists to register models in a registry.
Horovod is an open-source distributed training framework and is the alternative to training a model on a single-node cluster. Horovod allows data scientists to distribute the training process and make use of Spark’s parallel processing.
Since deep learning models contain layers that need to be processed sequentially, and use intermediary results to improve the model at the end of an epoch, the parallel processing of deep learning models can quickly become complicated. Horovod is designed to take care of the infrastructure management so that data scientists can focus on training models.
When Horovod is used on top of one of the deep learning frameworks (TensorFlow, PyTorch or Keras), it trains multiple models on different batches of the input dataset on separate workers. In other words, multiple models are trained in parallel on separate workers using different subsets of the data.
At the end of an epoch, the weights are communicated between workers and the average weight of all workers is calculated. Next, a new epoch can start using the new average weight and during which again, multiple models are trained in parallel.
To distribute the training of a deep learning model using HorovodRunner, you should do the following:
Migrate to Horovod Once you have tested your single-node code to train a deep learning model, you have to migrate it to Horovod before you can trigger the job with HorovodRunner.
Import the Horovod framework as hvd. Initialize the Horovod library with hvd.init(). Pin one GPU per process. Pinning is necessary to disable random mapping of workers and avoid clashes. Pinning is skipped when using CPUs. Specify how you want to partition or sample the data so that each worker uses a unique subset of the data to train a model. As a best practice, make sure the subsets are all the same size. Depending on the input dataset, there are several techniques to do the sampling. For example, you could use Petastorm to work with datasets in Apache Parquet format. Learn more about the open-source library Petastorm here. Scale the learning rate by the number of workers to make sure the weights are adjusted correctly after each epoch. Use the Horovod distributed optimizer to handle the communication between workers. Broadcast the initial parameters so all workers start with the same parameters. Save checkpoints only on worker 0 to prevent conflicts between workers.