Introduction
Spark 2 introduced the concept of structured streaming, giving users the ability to process streams of unbounded data using higher level abstractions.
This is an extremely powerful capability which allows data engineers to do streaming transformations and analytics over data as it is ingested, and potentially join and integrate this with batch data at rest. This can happen within Spark and potentially within Databricks so that ETL can take place in the same location as data analysis and data science activities.
As the name implies, Structured Streams relies on a typed model, whereby we define the structure of our messages up front as a schema. In the example below, we have defined a simple order with an ID, a category, a value and a shopping type.
import org.apache.spark.sql.types._
spark.conf.set("spark.sql.streaming.stopActiveRunOnRestart", true)
val arraySchema = ArrayType(StringType)
val jsonSchema = new StructType()
.add("OrderID",StringType)
.add("Category", StringType)
.add("Value", IntegerType)
.add("Shipping", StringType)
Often, we would would then look to ingest messages as they stream in from an external source such as Apache Kafka:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.types._
import scala.util.Random
def getUrlContent(url: String): String = {
scala.io.Source.fromURL(url).mkString
}
val inputStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "123.123.123.123:9094,123.123.123.123:9094,123.123.123.123:9094")
.option("subscribe", "1_1_Orders")
.option("startingOffsets", "latest")
.load()
.selectExpr("cast (value as STRING) jsonData")
.select(from_json($"jsonData", jsonSchema).alias("rec"))
.select("rec.*")
display(inputStream)
The next step is typically to stream the structured stream to disk, potentially as a delta table for performance and transactionality. Sometimes this is described as a bronze table as it is a simple audit log of inbound data.
inputStream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/dbfs/ordercheckpoint")
.option("mergeSchema", "true")
.option("path", "/dbfs/orders")
.table("inbound_orders")
display(inputStream)
Finally, we then tend to stream directly from the bronze table as it is written to create downstream aggregations such as a group by category which is updated in real time. This gives us both an audit of the inbound data and a downstream aggregation table that is updated in real time.
val ordersStream = spark
.readStream
.format("delta")
.table("inbound_orders")
.groupBy("category")
.agg(sum($"value") as "total_value")
display(ordersStream)
ordersStream.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/dbfs/ordersbycategorycheckpoint")
.option("mergeSchema", "true")
.option("path", "/dbfs/ordersbycategory")
.table("orderbycategory")
Structured Streaming within Databricks is a very nice solution for ETL, pre-aggregating data and real time analytics. If you would like to deploy this pattern, please get in touch.