900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > 结构化流编程指南

结构化流编程指南

时间:2020-03-16 17:29:12

相关推荐

结构化流编程指南

独角兽企业重金招聘Python工程师标准>>>

结构化流编程指南

概述简单的例子编程模型 基本概念事件时间和后期数据处理容错语义使用数据和DataFrames API 创建流DataFrames和流媒体数据集 输入源模式推理和分区流DataFrames /数据集操作流DataFrames /数据集 基本操作-选择、投影、聚合窗口事件时间的操作后期处理数据和水印连接操作流媒体重复数据删除任意的有状态操作不支持的操作开始流查询 输出模式输出下沉使用Foreach流媒体管理查询流媒体监控查询 阅读指标交互报告指标使用异步api以编程方式报告指标使用Dropwizard从故障中恢复检查点额外的信息

概述

结构化流是一个具有可扩展性和容错性流处理引擎建立在火花的SQL引擎。你可以表达你的流计算相同的方式表达对静态数据批处理计算。火花的SQL引擎会照顾运行它的增量,不断和更新最终结果随着流数据到达。您可以使用数据集/ DataFrame API在Scala中,Java、Python或R表达流聚合,事件时间窗口,stream-to-batch连接等。计算上执行相同的SQL引擎优化的火花。最后,系统确保端到端只有一次容错担保通过检查点和写日志。简而言之,结构化流提供了快速、可伸缩、容错、端到端仅一次流处理无需用户关于流的原因。

在本指南中,我们将带您通过编程模型和api。首先,让我们从一个简单的例子开始,流字数。

简单的例子

假设你想保持运行字数的文本数据从数据接收服务器监听TCP套接字。让我们看看如何表达这种使用结构化流。你可以看到完整的代码Scala/Java/Python/R。如果你下载火花,您可以直接运行示例。在任何情况下,让我们一步一步走过的例子,理解它是如何工作的。首先,我们必须导入必要的类,并创建一个本地SparkSession的起点引发相关的所有功能。

ScalaJavaPythonR

importorg.apache.spark.api.java.function.FlatMapFunction;importorg.apache.spark.sql.*;importorg.apache.spark.sql.streaming.StreamingQuery;importjava.util.Arrays;importjava.util.Iterator;SparkSessionspark=SparkSession.builder().appName("JavaStructuredNetworkWordCount").getOrCreate();

接下来,让我们创建一个流DataFrame表示文本数据接收从服务器监听localhost:9999,转换DataFrame计算单词统计。

ScalaJavaPythonR

// Create DataFrame representing the stream of input lines from connection to localhost:9999Dataset<Row>lines=spark.readStream().format("socket").option("host","localhost").option("port",9999).load();// Split the lines into wordsDataset<String>words=lines.as(Encoders.STRING()).flatMap((FlatMapFunction<String,String>)x->Arrays.asList(x.split(" ")).iterator(),Encoders.STRING());// Generate running word countDataset<Row>wordCounts=words.groupBy("value").count();

linesDataFrame代表一个无界的表包含流媒体文本数据。这个表包含一个字符串列命名为“价值”,和流中的每一行文本数据表中的一行。需要指出的是,这并不像我们只是目前接收任何数据设置转换,尚未开始。接下来,我们把DataFrame字符串使用的数据集.as(Encoders.STRING()),这样我们就可以应用flatMap操作每一行分割成多个单词。结果words数据集包含所有的单词。最后,我们已经定义了wordCountsDataFrame分组的惟一值数据集和计数。注意,这是一个流DataFrame代表的运行单词统计流。

我们已经建立了流数据上的查询。剩下就是开始接收数据和计算数量。为此,我们设置它打印指定的成套计数(outputMode("complete"))到控制台每次更新。然后开始流计算使用start().

ScalaJavaPythonR

// Start running the query that prints the running counts to the consoleStreamingQueryquery=wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();

这段代码执行后,流计算将会开始在后台。的query对象是一个句柄,活动流查询,我们已经决定等待查询的终止使用awaitTermination()防止进程退出时查询是活跃的。

实际执行这个例子代码,你可以自己编译的代码火花的应用程序,或者只是运行示例一旦你已经下载了火花。我们是后者。您将首先需要运行Netcat(一个小实用程序在大部分类unix系统)数据服务器通过使用

$ nc -lk 9999

然后,在一个不同的终端,你可以开始使用的例子

ScalaJavaPythonR

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

然后,任何线路输入终端运行netcat服务器每秒钟将统计和打印在屏幕上。它会看起来像下面的。

编程模型

结构化流的核心思想是将实时数据流作为一个不断附加的表。这将导致一个新的流处理模型非常类似于批处理模式。你会表达你流计算标准批查询在一个静态的表,并引发运行它作为增量查询无界输入表。让我们更详细地了解这个模型。

基本概念

考虑输入数据流为“输入表”。每个数据项都是到达流就像一个新行被添加到输入表。

输入一个查询将生成“结果表”。每一个触发间隔(说,每1秒),新行添加到输入表,最终更新结果表。只要结果表更新,我们会想写改变结果行到外部下沉。

“输出”的定义是什么写入外部存储。输出可以被定义在一个不同的模式:

完成模式——整个更新结果表将被写入外部存储器。由存储连接器来决定如何处理写作的整个表。

Append模式——只有结果表中的新行附加自上次触发将写入到外部存储。这是只适用于现有的查询结果表中的行是不会改变的。

更新模式,只有结果表中的行,更新自上次触发器将写入外部存储器(可用火花以来2.1.1)。注意,这是不同于完整的模式,这种模式只输出行改变了自从上次触发。如果查询不包含聚合,它将相当于Append模式。

注意,每个模式应用于某些类型的查询。这是详细讨论晚些时候.

说明该模型的使用,让我们理解的模型上下文简单的例子以上。第一个linesDataFrame是输入表,最后wordCountsDataFrame结果表。请注意,查询流媒体linesDataFrame生成wordCounts是完全一样的,因为它将是一个静态DataFrame。然而,当这个查询的方法是开始,火花将不断检查新的套接字连接的数据。如果有新数据,火花将运行一个“增量”查询前运行计数相结合的新的数据来计算数量更新,如下所示。

注意,结构化流不实现整个表。它读取最新的可用数据流数据来源,处理结果逐步更新,然后丢弃的源数据。只保持在所需的最少的中间状态的数据更新结果(如之前中间项的例子)。

这个模型是明显不同于许多其他流处理引擎。许多流媒体系统需要用户维护运行聚合,因此不得不思考容错,数据一致性(至少一次、或至多一次、或仅一次)。在这个模型中,火花负责更新结果表当有新的数据,从而缓解用户从推理。作为一个例子,让我们看看这个模型基于事件时间处理和后期到达数据。

事件时间和后期数据处理

事件时间是嵌入在数据本身。对于许多应用程序,您可能想要使用这个事件时间。举个例子,如果你想要得到事件的数量所产生的物联网设��每一分钟,那么你可能想要使用数据生成时(也就是说,事件时间数据),而不是火花的时间接收他们。这个事件时间很自然地表达了在这个模型中,每个事件从设备表中的一行,事件时间行中的一个列值。这允许窗口聚合(如每分钟的事件数量)是一种特殊类型的分组和聚合的事件时间列,每次窗口是一组,每一行可以属于多个窗口/组。因此,这样event-time-window-based聚合查询可以定义始终在一个静态数据集(例如,从收集设备事件日志)以及一个数据流,让用户更容易的生活。

此外,自然该模型处理数据根据其事件时间比预期晚到达。由于火花是更新结果表,它已完全控制更新旧的聚合后期时数据,以及清理旧的总量限制中间状态数据的大小。火花2.1以来,我们支持水印它允许用户指定阈值的数据,并允许发动机因此清理旧的状态。这些都是后来更详细地解释道窗口操作部分。

容错语义

提供端到端只有一次语义是背后的关键目标之一的设计结构化流。为达到这一目的,我们设计了结构化流源,水槽和执行引擎的具体进展跟踪处理,以便它可以处理任何类型的故障通过重启和/或再加工。每个流源认为补偿(类似于卡夫卡补偿,或运动序列数字)来跟踪流读取位置。引擎使用检查点和提前写日志来记录偏移量范围的数据被处理在每一个触发器。流水槽被设计成幂等来处理再加工。在一起,使用可复制源和幂等汇,结构化流可以确保端到端只有一次语义在任何失败。

使用数据和DataFrames API

自2.0火花,DataFrames和数据集可以表示静态的,有限的数据,以及流,无限数据。类似于静态数据集/ DataFrames,您可以使用常见的入口点SparkSession(Scala/Java/Python/R文档)来创建流DataFrames /流媒体来源的数据集,并应用相同的操作作为静态DataFrames /数据集。如果您不熟悉数据集/ DataFrames,强烈建议您熟悉他们使用DataFrame /数据集编程指南.

创建流DataFrames和流媒体数据集

流DataFrames可以通过创建DataStreamReader接口(Scala/Java/Python返回的文档)SparkSession.readStream()。在R,read.stream()方法。类似于阅读界面创建静态DataFrame,您可以指定源——数据格式的细节,模式的选择等。

输入源

有一些内置的来源。

文件来源——读取文件写在一个目录作为流的数据。支持的文件格式是文本、csv、json、拼花。看到DataStreamReader接口的文档更最新的列表,并支持选项为每个文件格式。注意,文件必须自动放置在给定目录,这在大多数文件系统,可以通过文件移动操作。

卡夫卡从卡夫卡来源——读取数据。这是兼容卡夫卡代理0.10.0或更高版本。看到卡夫卡集成指南为更多的细节。

套接字来源(测试)-从套接字读取use UTF8文本数据连接。听力服务器套接字是司机。注意,这应该只用于测试不提供端到端容错担保。

率源(测试)-生成数据在指定的行数/秒,每个输出行包含一个timestampvalue。在哪里timestamp是一个Timestamp包含消息调度的时间类型,valueLong包含消息的类型,从0开始的第一行。这种来源是用于测试和基准测试。

一些来源不容错因为他们不保证数据可以使用检查点重播补偿后失败。看到前面的部分容错语义。这里有火花的所有来源的详细信息。

这里有一些例子。

ScalaJavaPythonR

SparkSessionspark=...// Read text from socketDataset<Row>socketDF=spark.readStream().format("socket").option("host","localhost").option("port",9999).load();socketDF.isStreaming();// Returns True for DataFrames that have streaming sourcessocketDF.printSchema();// Read all the csv files written atomically in a directoryStructTypeuserSchema=newStructType().add("name","string").add("age","integer");Dataset<Row>csvDF=spark.readStream().option("sep",";").schema(userSchema)// Specify schema of the csv files.csv("/path/to/directory");// Equivalent to format("csv").load("/path/to/directory")

这些例子生成流DataFrames无类型,这意味着DataFrame的模式不是在编译时检查,只有在运行时检查提交查询时。一些操作,比如map,flatMap等需要的类型在编译时是已知的。要做到这些,你可以把这些无类型的流DataFrames输入流数据集作为静态DataFrame使用相同的方法。看到SQL编程指南为更多的细节。此外,支持流媒体来源的更多细节稍后讨论的文档。

模式推理和分区流DataFrames /数据集

默认情况下,结构化流基于文件的来源需要你指定的模式,而不是依赖火花自动推断出它。这一限制确保一致的模式将被用于流媒体查询,即使在失败的情况下。对于特定的用例,您可以使再能模式推理通过设置spark.sql.streaming.schemaInferencetrue.

分区时发现发生命名的子目录/key=value/和清单会自动存在递归到这些目录。如果这些列出现在用户提供的模式,他们将在基于文件的路径的火花被阅读。构成了分区方案的目录必须存在查询开始时,必须保持静态。例如,可以添加/data/year=//data/year=/在场,但它是无效的更改分区列(即通过创建的目录吗/data/date=-04-17/).

操作流DataFrames /数据集

您可以应用各种操作流DataFrames /数据集-从无类型,(如sql操作。select,where,groupBy(如),输入RDD-like操作。map,filter,flatMap)。看到SQL编程指南为更多的细节。让我们看看几个例子操作,您可以使用。

基本操作-选择、投影、聚合

最常见的操作DataFrame /数据集支持流媒体。很少有不支持的操作稍后讨论在这一节中。

ScalaJavaPythonR

importorg.apache.spark.api.java.function.*;importorg.apache.spark.sql.*;importorg.apache.spark.sql.expressions.javalang.typed;importorg.apache.spark.sql.catalyst.encoders.ExpressionEncoder;publicclassDeviceData{privateStringdevice;privateStringdeviceType;privateDoublesignal;privatejava.sql.Datetime;...// Getter and setter methods for each field}Dataset<Row>df=...;// streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }Dataset<DeviceData>ds=df.as(ExpressionEncoder.javaBean(DeviceData.class));// streaming Dataset with IOT device data// Select the devices which have signal more than 10df.select("device").where("signal > 10");// using untyped APIsds.filter((FilterFunction<DeviceData>)value->value.getSignal()>10).map((MapFunction<DeviceData,String>)value->value.getDevice(),Encoders.STRING());// Running count of the number of updates for each device typedf.groupBy("deviceType").count();// using untyped API// Running average signal for each device typeds.groupByKey((MapFunction<DeviceData,String>)value->value.getDeviceType(),Encoders.STRING()).agg(typed.avg((MapFunction<DeviceData,Double>)value->value.getSignal()));

你也可以注册一个流DataFrame /数据集作为临时视图,然后应用SQL命令。

ScalaJavaPythonR

df.createOrReplaceTempView("updates");spark.sql("select count(*) from updates");// returns another streaming DF

注意,您可以确定是否DataFrame /数据流数据或不通过df.isStreaming.

ScalaJavaPythonR

df.isStreaming()

窗口事件时间的操作

聚合在一个滑动窗口事件时间简单的结构化流和非常类似于分组聚合。在分组聚合,聚合值(例如计数)维护每个用户指定的分组列中的独特价值。基于窗口的聚合,聚合值为每个窗口的事件时间保持行落入。我们理解这个例子。

想象我们简单的例子修改和流现在包含线时线生成。而不是单词统计,我们想要在10分钟数字窗口,每5分钟更新。即词数量的话收到了10分钟的windows 12:00 - 12:10,12:05 - 12:15,12:10 - 12:20,等。注意,12:00 - 12:10意味着数据,12点之后但在12:10到达。现在,考虑这个词在12:07收到。这个词应该增加数量对应两个窗口12:00 - 12:10和12:05 - 12:15。因此计数将被,分组密钥(即这个词)和窗口(可以从事件时间计算)。

类似于下面的结果表。

因为这个窗口类似于分组,在代码中,您可以使用groupBy()window()表达有窗的聚合操作。你可以看到下面的完整代码的例子Scala/Java/Python.

ScalaJavaPython

Dataset<Row>words=...// streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each groupDataset<Row>windowedCounts=words.groupBy(functions.window(words.col("timestamp"),"10 minutes","5 minutes"),words.col("word")).count();

后期处理数据和水印

现在考虑如果迟到到应用程序的事件之一。例如,说,一个字生成的上午(即事件时间)可以由应用程序接收十二11。应用程序应该利用上午的时间而不是十二11更新旧的窗口12:00 - 12:10。自然而然地发生在我们的窗口分组——结构化流能保持中间状态的部分骨料等很长一段时间,后期数据可以更新总量的旧windows正确,如下图所示。

然而,为天,运行这个查询是必要的系统绑定的中间量内存状态积累。这意味着系统需要知道当一个老总可以从内存中的状态,因为应用程序不会接收后期数据的聚合。启用,在火花2.1中,我们已经介绍了水印,让引擎自动跟踪当前事件时间的相应数据,试图清理旧状态。您可以定义一个查询的水印指定事件时间列和多晚的阈值数据预计将事件时间。开始时间为一个特定的窗口T,发动机将维护状态,允许数据更新状态,直到后期(max event time seen by the engine - late threshold > T)。换句话说,在阈值将聚合后期数据,但数据迟于阈值将下降。让我们理解这一个例子。我们可以很容易地定义在前面的示例中使用水印withWatermark()如下所示。

ScalaJavaPython

Dataset<Row>words=...// streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each groupDataset<Row>windowedCounts=words.withWatermark("timestamp","10 minutes").groupBy(functions.window(words.col("timestamp"),"10 minutes","5 minutes"),words.col("word")).count();

在这个例子中,我们定义查询的水印的值列上“时间戳”,并定义“十分钟”多晚的阈值是允许的数据。如果这个查询的方法是运行在更新输出模式(稍后讨论输出模式部分),引擎将继续更新计数结果表,直到一个窗口的窗口比水印,落后于当前的事件时间列“时间戳”10分钟。这是一个例子。

如插图所示,最大事件时间跟踪引擎是蓝色虚线,水印设置为(max event time - '10 mins')开始时每个触发红线为例,当引擎观察数据(12:14, dog)为下一个触发器,它集水印12:04。这个水印让引擎维护额外的10分钟的中间状态允许后期数据统计。例如,数据(12:09, cat)坏了,晚了,它落在windows12:00 - 12:1012:05 - 12:15。以后,它仍然是领先的水印12:04在触发器,发动机仍然保持中间状态和正确地更新相关的重要窗口。然而,当更新水印12:11,窗口的中间状态(12:00 - 12:10)清除,所有后续的数据(如。(12:04, donkey))被认为是“太迟了”,因此忽略了。注意,每次触发后,更新后的数量(即紫色行)作为触发器输出写入到水槽,口述的更新模式。

一些水槽(例如文件)可能不支持细粒度更新需要更新模式。与他们一起工作,我们也支持附加模式,只有最后的数量将被写入到水槽。这是所示。

注意,使用withWatermark在非数据集是空操作。随着水印不应以任何方式影响任何批量查询,我们将直接忽略它。

类似于更新模式,发动机维护中间计数为每个窗口。然而,部分数量没有更新结果表,而不是写入下沉。引擎等待“十分钟”晚日期计算,然后滴中间状态的窗口<水印,并附加最后统计结果表/下沉。例如,窗口的最后计数12:00 - 12:10是附加到更新后的结果表只有水印12:11.

水印清洁条件聚合状态一定要注意,必须满足下列条件水印清洁状态的聚合查询(2.1.1火花,将来可能发生变化)。

输出模式必须追加或更新。完整的模式需要保存所有聚合数据,因此不能使用水印中间状态。看到输出模式部分详细解释每个输出的语义模式。

聚合一定事件时间列,或window事件时间列。

withWatermark必须在同一列上被称为聚合中使用时间戳列。例如,df.withWatermark("time", "1 min").groupBy("time2").count()是无效的附加输出模式,水印是定义在一个不同的列从聚合列。

withWatermark之前必须被称为水印信息的聚合。例如,df.groupBy("time").count().withWatermark("time", "1 min")是无效的附加输出模式。

连接操作

可以与静态流DataFrames DataFrames创建新的流DataFrames。这里有一些例子。

ScalaJavaPython

Dataset<Row>staticDf=spark.read()....;Dataset<Row>streamingDf=spark.readStream()....;streamingDf.join(staticDf,"type");// inner equi-join with a static DFstreamingDf.join(staticDf,"type","right_join");// right outer join with a static DF

流媒体重复数据删除

你可以删除处理记录事件数据流使用一个惟一的标识符。这正是一样的重复数据删除静态使用一个惟一的标识符列。必要的查询将存储的数据量从以前的记录,这样它可以过滤重复的记录。类似于聚合,可以使用重复数据删除有或没有水印。

水印——如果有一个上限多晚复制记录可以到达,你可以定义一个水印事件时间列,删除处理使用guid和事件时间列。查询将使用水印从过去的记录,删除旧的状态数据预计不会得到任何副本。这个界限的状态查询必须维护。

没有水印——因为没有界限复制记录可能到达的时候,查询存储数据从所有过去记录的状态。

ScalaJavaPython

Dataset<Row>streamingDf=spark.readStream()....;// columns: guid, eventTime, ...// Without watermark using guid columnstreamingDf.dropDuplicates("guid");// With watermark using guid and eventTime columnsstreamingDf.withWatermark("eventTime","10 seconds").dropDuplicates("guid","eventTime");

任意的有状态操作

许多可变性比聚合需要更高级的有状态操作。例如,在许多可变性,你必须从数据流的事件跟踪会话。sessionization等做的,你必须保存任意类型的数据状态,和对国家执行任意操作使用数据流在每一个触发事件。因为火花2.2,这可以通过使用操作mapGroupsWithState和更强大的操作flatMapGroupsWithState。两个操作允许您使用���户定义的代码分组数据集更新用户定义的状态。更具体的细节,看看API文档(Scala/Java和例子Scala/Java).

不支持的操作

有几个DataFrame /数据集操作,不支持流媒体DataFrames /数据集。其中一些如下。

多个流聚合(即链的聚合流DF)还不支持流媒体数据集。

限制,采取第N行不支持流媒体数据集。

不同的操作不支持流媒体数据集。

排序操作支持流媒体数据集的聚合和后完成输出模式。

外部连接流和静态数据集有条件的支持。

全外连接不支持流媒体数据集

左外连接的流数据集不支持在右边

右外连接与一个不支持流媒体数据集在左边

任何类型的连接两个流数据集之间是不支持的。

此外,还有一些数据集不会从事流媒体数据集的方法。他们的行动将立即运行查询并返回结果,流媒体数据集也没有意义。相反,这些功能可以通过显式地开始流查询(参见下一节有关)。

count()——从流媒体数据集不能返回一个计数。相反,使用ds.groupBy().count()它返回一个流媒体数据集,其中包含运行计数。

foreach()——而不是用ds.writeStream.foreach(...)(参见下一节)。

show(),而不是使用控制台水槽(参见下一节)。

如果你尝试任何这些操作,您将看到一个AnalysisException像“XYZ行动不支持流媒体DataFrames /数据集”。尽管他们中的一些人可能在将来的版本中支持的火花,还有其他的从根本上难以有效实现流媒体数据。例如,排序不支持输入流,因为它需要跟踪所有接收到的数据流。这是因此从根本上难以有效执行。

开始流查询

一旦您已经定义了最终结果DataFrame /数据集,剩下的是你开始流计算。要做到这一点,你必须使用DataStreamWriter(Scala/Java/Python返回的文档)通过Dataset.writeStream()。您必须指定一个或多个以下的界面。

输出水槽的细节:数据格式、位置等。

输出模式:指定写入输出什么下沉。

查询名称:可选地,为识别指定查询的一个唯一的名称。

触发间隔:可选地,指定触发间隔。如果没有指定,系统将检查尽快获得新数据前处理完成。如果一个触发时间错过了,因为前面的处理还没有完成,则系统将触发立即处理。

检查点位置:对于一些输出端到端容错可以保证水槽,指定系统将编写所有检查点的位置信息。这应该是一个目录的HDFS-compatible容错文件系统。检查点的语义将在下一节中详细讨论。

输出模式

有一些类型的输出模式。

Append模式(默认)——这是默认模式中,只有新行添加到结果表自上次触发器将被输出到水槽。这是只支持那些查询,行添加到结果表永远不会改变。因此,这种模式保证每一行将只输出一次(假设容错水槽)。例如,只有查询select,where,map,flatMap,filter,join等将支持Append模式。

完成模式——整个结果表将被输出到水槽后每一个触发器。这是支持聚合查询。

更新模式-火花以来(2.1.1)只有结果表中的行,自上次更新触发器将被输出到水槽。在将来的版本中添加的更多信息。

不同类型的流媒体查询支持不同的输出模式。这是兼容性矩阵。

输出下沉

有一些类型的内置输出下沉。

文件沉——存储输出目录。

writeStream.format("parquet")// can be "orc", "json", "csv", etc..option("path","path/to/destination/dir").start()

卡夫卡水槽——存储输出到一个或多个主题卡夫卡。

writeStream.format("kafka").option("kafka.bootstrap.servers","host1:port1,host2:port2").option("topic","updates").start()

Foreach水槽——运行任意计算输出的记录。更多细节请参见后面的部分。

writeStream.foreach(...).start()

控制台水槽(调试)-将输出打印到控制台/ stdout每次有一个触发器。附加和完整的输出模式,两者都支持。这应该是用于调试目的低数据量作为整个输出被收集并存储在每次触发后司机的记忆。

writeStream.format("console").start()

内存水槽(调试)的输出作为一个内存中的表存储在内存中。附加和完整的输出模式,两者都支持。这应该是用于调试目的低数据量作为整个输出被收集并存储在司机的记忆。因此,慎用。

writeStream.format("memory").queryName("tableName").start()

有些下沉不是容错因为他们不保证输出的持久性和仅用于调试目的。看到前面的部分容错语义。这里是所有的细节引发的下沉。

请注意,您必须调用start()开始查询的执行。这返回一个StreamingQuery对象是连续运行的处理执行。你可以用这个对象来管理查询,我们将在下一小节进行讨论。现在,让我们了解这一切与几个例子。

ScalaJavaPythonR

// ========== DF with no aggregations ==========Dataset<Row>noAggDF=deviceDataDf.select("device").where("signal > 10");// Print new data to consolenoAggDF.writeStream().format("console").start();// Write new data to Parquet filesnoAggDF.writeStream().format("parquet").option("checkpointLocation","path/to/checkpoint/dir").option("path","path/to/destination/dir").start();// ========== DF with aggregation ==========Dataset<Row>aggDF=df.groupBy("device").count();// Print updated aggregations to consoleaggDF.writeStream().outputMode("complete").format("console").start();// Have all the aggregates in an in-memory tableaggDF.writeStream().queryName("aggregates")// this query name will be the table name.outputMode("complete").format("memory").start();spark.sql("select * from aggregates").show();// interactively query in-memory table

使用Foreach

foreach操作允许任意操作计算输出数据。2.1的火花,这是只有Scala和Java。使用这个,你必须实现的接口ForeachWriter(Scala/Java文档)方法��调用时,有一个序列触发后生成的行作为输出。请注意以下要点。

作者必须是可序列化的,因为它将序列化和发送到执行人执行。

所有三种方法,open,processclose将呼吁执行人。

作家必须做所有的初始化(如打开连接,开始一个事务,等等)只有���open方法被调用。要知道,如果有任何初始化的类创建对象,那么会发生初始化驱动程序(因为这是实例被创建),这可能不是你的意愿。

versionpartition有两个参数open唯一的行表示一组需要推动。version是一个单调递增id,随每一个触发器。partition输出的是一个id,它代表了一个分区,因为输出是分布式和将在多个执行器进行处理。

open可以使用versionpartition选择是否需要写的行序列。因此,它可以返回true(继续写作),或false(不需要编写)。如果false返回,然后process不会要求任何行。例如,输出的部分失败之后,一些分区失败的触发可能已经提交到数据库。基于元数据存储在数据库中,作者可以识别分区,因此已经提交并返回false跳过提交他们了。

每当open被调用时,close也被称为(除非JVM退出由于一些错误)。即使是如此open返回false。如果有任何错误处理和写数据,close将调用的错误。是你的责任来清理状态(例如连接、事务等)中创建的open这样没有资源泄漏。

流媒体管理查询

StreamingQuery对象查询开始时创建可用于监视和管理查询。

ScalaJavaPythonR

StreamingQueryquery=df.writeStream().format("console").start();// get the query objectquery.id();// get the unique identifier of the running query that persists across restarts from checkpoint dataquery.runId();// get the unique id of this run of the query, which will be generated at every start/restartquery.name();// get the name of the auto-generated or user-specified namequery.explain();// print detailed explanations of the queryquery.stop();// stop the queryquery.awaitTermination();// block until query is terminated, with stop() or with errorquery.exception();// the exception if the query has been terminated with errorquery.recentProgress();// an array of the most recent progress updates for this queryquery.lastProgress();// the most recent progress update of this streaming query

你可以开始在一个SparkSession任意数量的查询。他们都将共享集群资源并行运行。您可以使用sparkSession.streams()得到StreamingQueryManager(Scala/Java/Python文档),可以用来管理当前活跃的查询。

ScalaJavaPythonR

SparkSessionspark=...spark.streams().active();// get the list of currently active streaming queriesspark.streams().get(id);// get a query object by its unique idspark.streams().awaitAnyTermination();// block until any one of them terminates

流媒体监控查询

有多种方法来监控活动流查询。你可以推动度量外部系统使用火花的Dropwizard指标的支持,或者通过编程方式访问它们。

阅读指标交互

你可以直接得到当前状态和指标的一个活跃的查询使用streamingQuery.lastProgress()streamingQuery.status().lastProgress()返回一个StreamingQueryProgress对象Scala和Java在Python和字典字段。它拥有所有的信息在过去所取得的进展引发——数据流的处理,是什么处理速率,延迟,等也有streamingQuery.recentProgress返回一个数组的进展。

此外,streamingQuery.status()返回一个StreamingQueryStatus对象Scala和Java在Python和字典字段。它给信息查询立即做什么——是一个触发活跃,是数据处理等。

这里有一些例子。

ScalaJavaPythonR

StreamingQueryquery=...System.out.println(query.lastProgress());/* Will print something like the following.{ "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109", "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a", "name" : "MyQuery", "timestamp" : "-12-14T18:45:24.873Z", "numInputRows" : 10, "inputRowsPerSecond" : 120.0, "processedRowsPerSecond" : 200.0, "durationMs" : { "triggerExecution" : 3, "getOffset" : 2 }, "eventTime" : { "watermark" : "-12-14T18:45:24.873Z" }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-0]]", "startOffset" : {"topic-0" : { "2" : 0, "4" : 1, "1" : 1, "3" : 1, "0" : 1} }, "endOffset" : {"topic-0" : { "2" : 0, "4" : 115, "1" : 134, "3" : 21, "0" : 534} }, "numInputRows" : 10, "inputRowsPerSecond" : 120.0, "processedRowsPerSecond" : 200.0 } ], "sink" : { "description" : "MemorySink" }}*/System.out.println(query.status());/* Will print something like the following.{ "message" : "Waiting for data to arrive", "isDataAvailable" : false, "isTriggerActive" : false}*/

报告指标使用异步api以编程方式

您还可以监视所有异步查询有关SparkSession通过附加一个StreamingQueryListener(Scala/Java文档)。一旦你附上您的自定义StreamingQueryListener对象与sparkSession.streams.attachListener()查询时,你会得到回调启动和停止时,取得的进展有一个活跃的查询。这是一个例子,

ScalaJavaPythonR

SparkSessionspark=...spark.streams().addListener(newStreamingQueryListener(){@OverridepublicvoidonQueryStarted(QueryStartedEventqueryStarted){System.out.println("Query started: "+queryStarted.id());}@OverridepublicvoidonQueryTerminated(QueryTerminatedEventqueryTerminated){System.out.println("Query terminated: "+queryTerminated.id());}@OverridepublicvoidonQueryProgress(QueryProgressEventqueryProgress){System.out.println("Query made progress: "+queryProgress.progress());}});

报告指标使用Dropwizard

火花支持报告指标使用Dropwizard图书馆。使指标的结构化流查询报告,您必须显式地启用配置spark.sql.streaming.metricsEnabledSparkSession。

ScalaJavaPythonR

spark.conf().set("spark.sql.streaming.metricsEnabled","true");// orspark.sql("SET spark.sql.streaming.metricsEnabled=true");

所有查询开始SparkSession这个配置启用后将报告指标通过Dropwizard等等汇已经配置(如神经节、石墨、JMX,等等)。

从故障中恢复检查点

在失败或故意关机的情况下,您可以恢复之前的进步和前一个状态查询,并继续离开。这是通过使用检查点和写日志。与检查点的位置,您可以配置一个查询,查询将保存所有进展信息(即每个触发器的抵消处理)和运行骨料(如单词统计简单的例子检查点的位置。这个检查点位置必须是一个路径在一个HDFS兼容的文件系统,并可以设置DataStreamWriter当作为一个选项开始查询.

ScalaJavaPythonR

aggDF.writeStream().outputMode("complete").option("checkpointLocation","path/to/HDFS/dir").format("memory").start();

额外的信息

进一步的阅读

看到并运行Scala/Java/Python/R的例子。 指令如何运行引发的例子读到整合与卡夫卡结构化流卡夫卡集成指南了解有关使用DataFrames /数据集的更多细节火花SQL编程指南

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。