In this post we will discuss about Structured Streaming in Spark 2.3.1. It demonstrates the basic functionality of Structured Streaming. We have also described SparkSession, Schema and DataFrame API functionality. We have tried to cover the basics of the Structured Streaming.
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. The streaming computation is same as the batch computation as we will apply it on static data. It runs incrementally and continuously by Spark SQL engine which also takes care of updating the final result as streaming data continues to arrive. You can use Dataset/DataFrame API to express streaming collection of data in Scala, Java, Python or R. Structured streaming provides fast, scalable, fault tolerant, and end–to-end exactly once stream processing without the user having to reason about streaming.
By default, the queries in Structured Streaming are processed using micro-batch processing engine. Here, data streams are processed as a series of small batch jobs, thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.
A new low-latency processing mode called Continuous Processing was introduced since Spark 2.3. This processing mode can achieve end-to-end latencies as low as 1 milliseconds with at-least-once guarantees.
From SPARK 2.0.0 onwards, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. All the functionality available with sparkContext are also available in SparkSession.
In order to use APIs of SQL, HIVE, and Streaming, there is no need to create separate contexts since SparkSession includes all the APIs.
Here we have described how to create SparkSession.
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master( master = “local”.appName( name = “StreamApp”).get0rCreate()
A schema is the description of the structure of data which is defined to create a dataset in Spark SQL. A schema is described using StructType which is a collection of StructField objects that are names, data types and nullability classifier.
StructType and StructField belong to the org.apache.spark.sql.types package.
Described below is how to define schema.
import org.apache.spark.sql.types._ val schema = StructType( StructField(“id”, StringType, nullable = false) :: StructField(“city”, StringType, nullable = false) :: StructField(“pop”, StringType, nullable = false) :: StructField(“state”, StringType, nullable = false) :: Nil)
Streaming DataFrames can be created through the DataStreamReader interface (Scala/Java/Python docs) returned by SparkSession.readStream(). Similar to the read interface for creating static data frame, you can specify the details of the source – data format, schema, options, etc.
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master( master = "local").appName( name = "Streamapp").get0rCreate() import org.apache.spark.sql.types._ val schema = StructType( StructField("id", StringType, nullable = false):: StructField("city", StringType, nullable = false):: StructField("pop", StringType, nullable = false):: StructField("state", StringType, nullable = false):: Nil) val df = spark.readStream .format(source = "json") .schema(schema) .load(path = "/home/user/Downloads/demo")
Whenever console sink is triggered, it will print the output to the console. In this, both Append and Complete output modes are supported. After every trigger the total input data is collected and stored in the driver’s memory. So it should be used for the debugging purposes on the small amount of data.
1. Append Mode (Default): Append mode is the default mode and in this mode only the new rows will be added to the Result Table. Append mode is supported only for those queries in which the number of rows added to the table does not change. Append mode thus guarantees that rows are not repeated. Queries with only select, where, map, flatmap, filter and join are the examples of Append mode supported queries.
2. Complete Mode: After every trigger, the Complete mode pushes the whole Result Table to the output. Complete mode is supported for the queries related to the database or we can say for aggregation queries
3. Update Mode: In the update mode, only those rows which were updated since the last trigger will be pushed to the sink.
val query = df. WriteStream. format( source = "console"). start() query.awaitTermination()
We wanted the blog to be useful to get to know the Spark Structured Streaming better and will keep you posted on things topics like these. Keep coming for more!
Wish to discuss a project ?