We are aiming to develop a use case that combines Spark Streaming (using Flume) with Spark SQL for windowed operations, allowing us to execute Complex Event Processing (CEP) on incoming data streams. The challenge we face is that running a SQL query for each batch of incoming events appears to slow down significantly over time. For example, with a window size set to 600 seconds and a batch interval of 20 seconds, data is being ingested at a rate of one input every two seconds. After about ten minutes of stable input, the execution time of the SQL query begins to increase. Initially, a query like SELECT COUNT(*)
on 300 records might take just one second, but after 15 minutes, this can stretch to two or three seconds or more. I would appreciate any suggestions for improving the efficiency of our implementation. Below is an excerpt of the process we are currently using:
// Setting up Spark and streaming context
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, 20);
JavaReceiverInputDStream<SparkFlumeEvent> flumeEventStream = FlumeUtils.createStream(streamingContext, "localhost", 55555);
// Applying a window to the incoming events
JavaDStream<SparkFlumeEvent> eventWindow = flumeEventStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);
// Existing Spark context
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sparkContext);
eventWindow.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>() {
public Void call(JavaRDD<SparkFlumeEvent> eventData) throws Exception {
long startTime = System.currentTimeMillis();
long tempTime = System.currentTimeMillis();
JavaRDD<CustomEvent> processedRDD = eventData.map(new Function<SparkFlumeEvent, CustomEvent>() {
@Override
public CustomEvent call(SparkFlumeEvent event) throws Exception {
// Process and return a new CustomEvent
return customEvent;
}
});
DataFrame eventDataFrame = sqlContext.createDataFrame(processedRDD, CustomEvent.class);
eventDataFrame.registerTempTable("tempEvents" + tempTime);
sqlContext.cacheTable("tempEvents" + tempTime);
// Notice increasing execution time
long queryStart = System.currentTimeMillis();
Long countResult = sqlContext.sql("SELECT COUNT(*) FROM tempEvents" + tempTime).first().getLong(0);
System.out.println("Execution time for count query: " + (System.currentTimeMillis() - queryStart) / 1000L + " seconds ");
sqlContext.dropTempTable("tempEvents" + tempTime);
sqlContext.clearCache();
return null;
}
});