pci数据捕获和信号处理
Anyone who has managed a data pipeline would be aware of how the upstream data can change and how it can impact the entire pipeline. Imagine we have the following customer data in our datalake.
任何管理数据管道的人都会知道上游数据如何更改以及如何影响整个管道。 假设我们在数据湖中有以下客户数据。
+---+-----+| id| name|+---+-----+|id1|Alice||id2| Bob|+---+-----+One month later, we realized that the customer name has changed from Alice to Carol and we have been using incorrect data for the past month. Such data inaccuracies can impact our data analysis and machine learning models. So, how can we detect it and how can we automate it?
一个月后,我们意识到客户名称已从Alice更改为Carol并且过去一个月我们一直使用不正确的数据。 此类数据不正确会影响我们的数据分析和机器学习模型。 那么,我们如何检测它以及如何使其自动化?
Fortunately, with enough information, it is easy to update the data without over-complicating our existing pipeline. We need two components:
幸运的是,有了足够的信息,就可以轻松更新数据,而不会使我们现有的管道过于复杂。 我们需要两个组件:
Change Event Generator — to generate the create/ update/ delete events of our data
更改事件生成器-生成我们数据的创建/更新/删除事件
Change Event Resolver — to apply these changes on our existing data
变更事件解析器-将这些变更应用于我们现有的数据
The need for the Change Event Resolver is debatable if we can afford to live on a bleeding edge by using Apache Hudi or Delta Lake. Instead of resolving the changes and keeping a single copy of the data, Hudi / Delta Lake can persist each revision of our data (timeline in Hudi and time travel in delta lake). This is undoubtedly more powerful but there are scenarios where these components will not simply fit into existing pipelines especially because they require the data to be stored in their own hudi or delta format instead of the common parquet or avro format. This means our data cataloging, data monitoring and data visualization now need to understand hudi or delta - which may not be ready in a year's time especially if these services are not developed in-house.
如果我们能够负担得起使用Apache Hudi或Delta Lake的最新优势,那么对于Change Event Resolver的需求是有争议的。 Hudi / Delta Lake可以解决我们的数据的每次修订(Hudi中的timeline和delta lake中的time travel ,而不是解决更改并保持数据的单个副本。 无疑,此功能更强大,但在某些情况下,这些组件将无法简单地放入现有管道中,特别是因为它们要求数据以其自己的hudi或delta格式而不是普通parquet或avro格式存储。 这意味着我们的数据编目,数据监控和数据可视化现在需要了解hudi或delta一年之内可能尚未准备好,特别是如果这些服务不是内部开发的。
Let’s try to model a class that we can use to capture the changes.
让我们尝试为可用于捕获更改的类建模。
case class ChangeEvent( changeType: String, // type of change - INSERT, UPDATE or DELETE timestamp: String, // when this change happened columnNames: Seq[String], // names of columns (Mandatory for INSERT / UPDATE) columnValues: Seq[String], // values of columns (Mandatory for INSERT / UPDATE) oldKeyNames: Seq[String], // names of old key columns (Mandatory for UPDATE / DELETE) oldKeyValues: Seq[String] // values of old key columns (Mandatory for UPDATE / DELETE) )Most of the attributes are self-explanatory. oldKeyNames and oldKeyValues contain the primary keys / values of the old data which will be used in case of UPDATE / DELETE queries. We can also enrich the model with more attributes such as columnTypes and oldKeyTypes if we need to apply the changes differently based on the column data types.
大多数属性是不言自明的。 oldKeyNames和oldKeyValues包含旧数据的主键/值,将在UPDATE / DELETE查询的情况下使用。 如果需要根据列数据类型不同地应用更改,我们还可以使用更多属性(例如columnTypes和oldKeyTypes来丰富模型。
This is commonly known as Change data capture (CDC) and is supported by many databases — including PostgreSQL, MySQL and MongoDB. In fact, our ChangeEvent class is a simplified output of wal2json - a PostgreSQL plugin used for logical replication.
这通常称为变更数据捕获(CDC) ,并且受许多数据库支持,包括PostgreSQL,MySQL和MongoDB。 实际上,我们的ChangeEvent类是wal2json的简化输出, wal2json是用于逻辑复制的PostgreSQL插件。
There are 2 main advantages of using CDC:
使用CDC有2个主要优点:
Although the primary purpose of this feature is to create database replicas and to migrate data, it becomes very powerful in data pipelines since the change is available in the pipeline almost immediately — making a real time pipeline possible. 尽管此功能的主要目的是创建数据库副本并迁移数据,但是它在数据管道中变得非常强大,因为更改几乎可以立即在管道中进行,从而可以实现实时管道。 The upstream does not need to implement the notification logic in several places. To give an example, think of a movie review website. The pipeline needs to be informed when the user creates / updates / deletes his reviews — which means the pipeline needs to be informed across 3 different REST APIs. All it takes is one developer to forget notifying the pipeline and we will be losing data without anyone realising it. 上游不需要在多个地方实现通知逻辑。 举个例子,考虑一下电影评论网站。 当用户创建/更新/删除他的评论时,需要通知管道-这意味着需要通过3个不同的REST API通知管道。 只需要一名开发人员就忘记通知管道,我们将丢失数据而没有人意识到。The slight con here is that there is a slight performance degradation on the source database due to replication but the impact will differ based on the type of the database.
这里的一个小缺点是,由于复制,源数据库的性能会稍有下降,但是影响会因数据库的类型而异。
Now, after hearing this proposal, we might want to ask ourselves some questions.
现在,在听完这个建议之后,我们可能想问自己一些问题。
What if the data does not have the concept of a primary key (i.e. no oldKeyNames or oldKeyValues)?
如果数据没有主键的概念(即没有oldKeyNames或oldKeyValues )怎么办?
What if the primary key changes? 如果主键改变了怎么办? What if I add a column or rename a column? 如果添加列或重命名列怎么办? What if I remove a column? 如果我删除了列怎么办? What if a column changes its data type? 如果列更改其数据类型怎么办?Unfortunately, if our data does not have a primary key, CDC will not work at all (unless we are only interested in INSERTs and do not care about UPDATEs or DELETE s). In this case, the best would be to add a UUID or a sequence column to be used as the primary key. Primary key changes rarely happen to properly designed data models but in the unavoidable case, it would be best to treat post-change data as a new data model and get a snapshot of the data.
不幸的是,如果我们的数据没有主键,则CDC将根本无法工作(除非我们只对INSERT感兴趣,而不关心UPDATE或DELETE )。 在这种情况下,最好的方法是添加一个UUID或序列列以用作主键。 正确设计的数据模型很少发生主键更改,但是在不可避免的情况下,最好将更改后的数据视为新的数据模型,并获取数据快照。
The rest of the questions can be addressed within our code when we apply the change events to our existing data but these will not be covered in this post.
当我们将变更事件应用于现有数据时,其余问题可以在我们的代码中解决,但本文中将不涉及。
Now, let’s see how it works with some examples. I am using Apache Spark 2.4 in this example but the same logic can be reimplemented in other frameworks as well.
现在,让我们来看一些示例。 我在此示例中使用的是Apache Spark 2.4 ,但相同的逻辑也可以在其他框架中重新实现。
First, we are going to mock some data that already exists in our datalake. Notice that timestamp column is present in the data previously written to the datalake since we need to apply the change events based on this column.
首先,我们将模拟数据湖中已经存在的一些数据。 请注意,由于我们需要基于此列应用更改事件,因此在先前写入数据湖的数据中存在timestamp列。
import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.expressions.Window val schema = StructType(Seq("timestamp", "id", "name").map(StructField(_, StringType))) val datalakeDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq( Row("0", "id1", "Alice"), Row("0", "id2", "Bob") )), schema) +---------+---+-----+|timestamp| id| name|+---------+---+-----+| 0|id1|Alice|| 0|id2| Bob|+---------+---+-----+Next is to populate the change events that we want to apply to this data that our Change Event Generator would produce. The implementation of this generator is source specific so we won't be covering this in this post.
接下来是填充变化的事件,我们希望应用到这些数据,我们Change Event Generator会产生。 这个生成器的实现是特定于源的,因此我们将不在本文中介绍。
Note that oldKeyNames and oldKeyValues are not present for insert. Similarly, columnNames and columnValues are not required for delete.
需要注意的是oldKeyNames和oldKeyValues不存在的insert 。 同样, delete不需要columnNames和columnValues 。
val COLUMN_NAMES = Seq("id", "name") // column names of the data val OLD_KEY_NAMES = Seq("id") // column names of the primary key val changeEventDf = spark.createDataFrame(Seq( ChangeEvent("update", "1", COLUMN_NAMES, Seq("id1", "Angela"), OLD_KEY_NAMES, Seq("id1")), ChangeEvent("delete", "2", null, null, OLD_KEY_NAMES, Seq("id2")), ChangeEvent("insert", "3", COLUMN_NAMES, Seq("id2", "Carol"), null, null) )) +----------+---------+-----------+-------------+-----------+-------+|changeType|timestamp|columnNames|columnValues |oldKey |oldKey | | | | | |Names |Values | +----------+---------+-----------+-------------+-----------+-------+| update| 1| [id, name]|[id1, Angela]| [id] | [id1]|| delete| 2| null| null| [id] | [id2]|| insert| 3| [id, name]| [id2, Carol]| null | null|+----------+---------+-----------+-------------+-----------+-------+Let’s split the columnNames and columnValues columns so that we have a dataframe similar to our existing data. But we will keep the oldKeyNames and oldKeyValues since we still need to use them.
让我们拆分columnNames和columnValues列,以便获得一个类似于现有数据的数据columnValues 。 但是我们将保留oldKeyNames和oldKeyValues因为我们仍然需要使用它们。
val splitChangeEventDf = COLUMN_NAMES.zipWithIndex.foldLeft(changeEventDf) { (df, column) => df.withColumn(column._1, $"columnValues"(column._2)) }.drop("columnNames", "columnValues") +----------+---------+-----------+------------+----+------+|changeType|timestamp|oldKeyNames|oldKeyValues| id| name|+----------+---------+-----------+------------+----+------+| update| 1| [id]| [id1]| id1|Angela|| delete| 2| [id]| [id2]|null| null|| insert| 3| null| null| id2| Carol|+----------+---------+-----------+------------+----+------+According to these change events, we should apply the changes in this order.
根据这些更改事件,我们应按此顺序应用更改。
update name of id1 from Alice to Angela
将id1 name从Alice更新为Angela
delete id2
删除id2
insert id2 again with name = Carol
再次输入name为Carol id2
Now it’s time to create our second component that applies the change events generated by our Change Event Generator. First, let's create a couple of helper functions:
现在是时候创建第二个组件,该组件应用由Change Event Generator生成的Change Event Generator 。 首先,让我们创建几个辅助函数:
unionWithSchema - union dataframes while accounting for schema differences such as different column orders or mismatched columns, e.g. df1 has 2 columns: colA and colB but df2 only has colA.
unionWithSchema数据帧,同时考虑架构差异(例如不同的列顺序或不匹配的列),例如df1具有2列:colA和colB,而df2仅具有colA。
def unionWithSchema(dataFrames: DataFrame *): Option[DataFrame] = { if(dataFrames.isEmpty) { return None } val spark = dataFrames.head.sparkSession val distinctSchemas = dataFrames.map(_.schema.toList).distinct val unionDf = if(distinctSchemas.size == 1) { dataFrames.tail.foldLeft(dataFrames.head) { (df1, df2) => df1.union(df2) } } else { val allSchemas = distinctSchemas.flatten.distinct.sortBy(schema => schema.name) val schemaWithAllColumns = StructType(allSchemas) val emptyDataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schemaWithAllColumns) val orderedColumns = emptyDataFrame.columns.map(col) dataFrames.foldLeft(emptyDataFrame) { (df1, df2) => { val missingColumns = allSchemas diff df2.schema val unionSafeDf = missingColumns.foldLeft(df2) { (df, missingField) => df.withColumn(missingField.name, lit(null).cast(missingField.dataType)) } df1.union(unionSafeDf.select(orderedColumns: _*)) } } } Some(unionDf) }applyChangeEventsByTimestamp - apply change events chronologically.
applyChangeEventsByTimestamp按时间顺序应用更改事件。
def applyChangeEventsByTimestamp(dataFrame: DataFrame, primaryKeyArr: Array[String]): DataFrame = { val partitionCols = if(primaryKeyArr.isEmpty) { dataFrame.columns.map(col) } else { primaryKeyArr.map(colName => col(s"old_$colName")) } val window = Window.partitionBy(partitionCols: _*).orderBy(desc("timestamp")) var dfInitial = dataFrame var iteration = 0 var done = false // Potentially expensive. Monitor logs and improve if required. while (!done) { val dfBefore = if (iteration == 0) { dfInitial } else { primaryKeyArr.foldLeft(dfInitial) { // Update the old keys for UPDATE ops (df, colName) => df.withColumn(s"old_$colName", when($"changeType" === "update", col(colName)).otherwise(col(s"old_$colName"))) } } val dfAfter = dfBefore.withColumn("rank", row_number().over(window)) .filter(col("rank") === 1) .drop("rank") done = dfAfter.count == dfBefore.count dfInitial = dfAfter iteration = iteration + 1 } dfInitial }First, we will handle the most common scenario — new records getting added. We do not need to use timestamp at this point. Notice that there are 2 records for id2 since delete change event has not been applied yet.
首先,我们将处理最常见的情况-添加新记录。 此时我们不需要使用timestamp 。 请注意,由于尚未应用delete change事件,因此id2有2条记录。
val insertDf = splitChangeEventDf.filter($"changeType" === "insert").drop("oldKeyNames", "oldKeyValues") val datalakeAndInsertDf = unionWithSchema(insertDf, datalakeDf).get +----------+---+-----+---------+|changeType| id| name|timestamp|+----------+---+-----+---------+| insert|id2|Carol| 3|| null|id1|Alice| 0|| null|id2| Bob| 0|+----------+---+-----+---------+Now, let’s handle updating and deleting records. In our example, there is only one unique primary key array so the code inside the fold left will be executed only once.
现在,让我们处理更新和删除记录。 在我们的示例中,只有一个唯一的主键数组,因此左折内的代码将仅执行一次。
val updateDeleteDf = splitChangeEventDf.filter($"changeType" === "update" || $"changeType" === "delete") val distinctOldKeyNames = updateDeleteDf.select("oldKeyNames").distinct.collect.flatMap(_.getSeq[String](0)) // Ideally there should only be one but primary key can be changed in some cases val updateDeleteExpandedDfs = distinctOldKeyNames.foldLeft(datalakeAndInsertDf)( (datalakeAndInsertDf, primaryKeyArr) => { // Step 1: For INSERT / Existing data, create new pkey columns (using existing values) val datalakeAndInsertCeDf = primaryKeyArr.foldLeft(datalakeAndInsertDf) { (df, colName) => df.withColumn(s"old_$colName", col(colName)) } // Step 2: For UPDATE / DELETE, split the old keys array column val updateDeleteCeDf = primaryKeyArr.zipWithIndex.foldLeft(updateDeleteDf) { (df, pKey) => df.withColumn(s"old_${pKey._1}", $"oldKeyValues"(pKey._2)) } .filter($"oldKeyNames" === primaryKeyArr) .drop("oldKeyNames", "oldKeyValues") // Step 3: Union all the data val initialDf = unionWithSchema(datalakeAndInsertCeDf, updateDeleteCeDf).get.cache() // Step 4: Resolve the change events chronologically val resolvedDf = applyChangeEventsByTimestamp(initialDf, primaryKeyArr) // Step 5: Drop DELETE records and unnecessary columns resolvedDf .filter($"changeType" =!= "delete" || $"changeType".isNull) .drop(primaryKeyArr.map(colName => s"old_$colName"): _*) } } // Step 6: Remove unwanted columns and get rid of duplicate rows return updateDeleteExpandedDfs.drop("changeType").distinct()Let’s go through each step and visualize the output.
让我们完成每个步骤并可视化输出。
Step 1: Enrich datalakeAndInsertDf to include information about the existing primary keys. In this example, it would be old_id.
步骤1: datalakeAndInsertDf以包括有关现有主键的信息。 在此示例中,它将为old_id 。
scala> datalakeAndInsertCeDf.show+----------+---+-----+---------+------+|changeType| id| name|timestamp|old_id|+----------+---+-----+---------+------+| insert|id2|Carol| 3| id2|| null|id1|Alice| 0| id1|| null|id2| Bob| 0| id2|+----------+---+-----+---------+------+Step 2: Enrich updateDeleteDf to include information about the existing primary keys ( old_id in this example)
步骤2:充实updateDeleteDf以包括有关现有主键的信息(在此示例中为old_id )
scala> updateDeleteCeDf.show+----------+---------+----+------+------+|changeType|timestamp| id| name|old_id|+----------+---------+----+------+------+| update| 1| id1|Angela| id1|| delete| 2|null| null| id2|+----------+---------+----+------+------+Step3: Union the dataframes from #1 and #2.
步骤3:合并#1和#2中的数据帧。
scala> initialDf.show+----------+----+------+------+---------+|changeType| id| name|old_id|timestamp|+----------+----+------+------+---------+| insert| id2| Carol| id2| 3|| null| id1| Alice| id1| 0|| null| id2| Bob| id2| 0|| update| id1|Angela| id1| 1|| delete|null| null| id2| 2|+----------+----+------+------+---------+Step 4: Apply change events chronologically as defined by our applyChangeEventsByTimestamp() function based on timestamp. At this point, we should only have one record per primary key (except for delete records which will be removed in the next step).
第4步:按照timestamp applyChangeEventsByTimestamp()函数的定义,按时间顺序应用更改事件。 此时,每个主键只应有一个记录( delete记录将在下一步中删除)。
scala> resolvedDf.show+----------+---+------+------+---------+|changeType| id| name|old_id|timestamp|+----------+---+------+------+---------+| update|id1|Angela| id1| 1|| insert|id2| Carol| id2| 3|+----------+---+------+------+---------+Step 5: Remove records with delete change type as they are not to be persisted. In our case, the delete record is already overwritten by insert so there is nothing to be removed. old_id is also dropped since we do not need this information anymore.
步骤5:删除具有delete更改类型的记录,因为它们将不保留。 在我们的情况下, delete记录已被insert覆盖,因此没有要删除的内容。 由于我们不再需要此信息,因此还会删除old_id 。
+----------+---+------+---------+|changeType| id| name|timestamp|+----------+---+------+---------+| update|id1|Angela| 1|| insert|id2| Carol| 3|+----------+---+------+---------+Step 1–5 is to be repeated if there is any more combination of primary keys but since we only have one id in this example, this is the end of the resolution. We just need to drop the changeType and remove any duplicate record.
如果存在更多的主键组合,则将重复步骤1-5,但是由于在此示例中我们只有一个id ,因此这是解决方案的结束。 我们只需要删除changeType并删除任何重复的记录。
+---+------+---------+| id| name|timestamp|+---+------+---------+|id2| Carol| 3||id1|Angela| 1|+---+------+---------+And, Voila! We have a dataframe with the applied changes ready to be persisted to our datalake in any format we want.
还有,瞧! 我们有一个数据框,其中已应用更改可以随时以所需的任何格式保存到我们的数据湖中。
Originally published at https://dev.to on September 14, 2020.
最初于2020年9月14日发布在https://dev.to 。
翻译自: https://medium.com/swlh/handling-upstream-data-changes-via-change-data-capture-4b22b8c75363
pci数据捕获和信号处理
相关资源:PCI数据捕获和信号处理控制器