Aug
Introduction to Spark Structured Streaming
In this post we will discuss about Structured Streaming in Spark 2.3.1. It demonstrates the basic functionality of Structured Streaming. We have also described SparkSession, Schema and DataFrame API development functionality. We have tried to cover the basics of the Structured Streaming.
Structured Streaming
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. The streaming computation is same as the batch computation as we will apply it on static data. It runs incrementally and continuously by Spark SQL engine which also takes care of updating the final result as streaming data continues to arrive. You can use Dataset/DataFrame API to express streaming collection of data in Scala, Java, Python or R. Structured streaming provides fast, scalable, fault tolerant, and end–to-end exactly once stream processing without the user having to reason about streaming.
By default, the queries in Structured Streaming are processed using micro-batch processing engine. Here, data streams are processed as a series of small batch jobs, thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.
A new low-latency processing mode called Continuous Processing was introduced since Spark 2.3. This processing mode can achieve end-to-end latencies as low as 1 milliseconds with at-least-once guarantees.
Spark Session
From SPARK 2.0.0 onwards, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. All the functionality available with sparkContext are also available in SparkSession.
In order to use APIs of SQL, HIVE, and Streaming, there is no need to create separate contexts since SparkSession includes all the APIs.
Creating Spark Session:
Here we have described how to create SparkSession.
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master( master = “local”.appName( name = “StreamApp”).get0rCreate()
Schema (Structure of Data)
A schema is the description of the structure of data which is defined to create a dataset in Spark SQL. A schema is described using StructType which is a collection of StructField objects that are names, data types and nullability classifier.
StructType and StructField belong to the org.apache.spark.sql.types package.
Defining Schema
Described below is how to define schema.
import org.apache.spark.sql.types._ val schema = StructType( StructField(“id”, StringType, nullable = false) :: StructField(“city”, StringType, nullable = false) :: StructField(“pop”, StringType, nullable = false) :: StructField(“state”, StringType, nullable = false) :: Nil)
Creating streaming DataFrames and streaming Datasets
Streaming DataFrames can be created through the DataStreamReader interface (Scala/Java/Python docs) returned by SparkSession.readStream(). Similar to the read interface for creating static data frame, you can specify the details of the source – data format, schema, options, etc.
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master( master = "local").appName( name = "Streamapp").get0rCreate() import org.apache.spark.sql.types._ val schema = StructType( StructField("id", StringType, nullable = false):: StructField("city", StringType, nullable = false):: StructField("pop", StringType, nullable = false):: StructField("state", StringType, nullable = false):: Nil) val df = spark.readStream .format(source = "json") .schema(schema) .load(path = "/home/user/Downloads/demo")
Console Sink
Whenever console sink is triggered, it will print the output to the console. In this, both Append and Complete output modes are supported. After every trigger the total input data is collected and stored in the driver’s memory. So it should be used for the debugging purposes on the small amount of data.
Output Modes:
- Append Mode (Default)
- Complete Mode
- Update Mode
1. Append Mode (Default): Append mode is the default mode and in this mode only the new rows will be added to the Result Table. Append mode is supported only for those queries in which the number of rows added to the table does not change. Append mode thus guarantees that rows are not repeated. Queries with only select, where, map, flatmap, filter and join are the examples of Append mode supported queries.
2. Complete Mode: After every trigger, the Complete mode pushes the whole Result Table to the output. Complete mode is supported for the queries related to the database or we can say for aggregation queries
3. Update Mode: In the update mode, only those rows which were updated since the last trigger will be pushed to the sink.
Writing data to Console:
val query = df. WriteStream. format( source = "console"). start() query.awaitTermination()
Output on Console:
We wanted the blog to be useful to get to know the Spark Structured Streaming better and will keep you posted on things topics like these. Keep coming for more!