在PySpark中结合列值查找最小和最大范围 [英] Find min and max range with a combination of column values in PySpark

查看:51
本文介绍了在PySpark中结合列值查找最小和最大范围的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像这样的pyspark数据框,

I have a pyspark dataframe like this,

+----------+--------+----------+----------+
|id_       | p      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-07-19|
|  2       | C      |2018-07-13|2018-10-07|
|  2       | B      |2018-12-31|2019-02-27|
|  2       | A      |2019-01-28|2019-06-25|
-------------------------------------------

从这个数据框中,我必须制作一个这样的数据框,

From this dataframe I have to make a dataframe like this,

+----------+--------+----------+----------+
|id_       | q      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-07-12|
|  2       | B C    |2018-07-13|2018-07-19|
|  2       | C      |2018-07-20|2019-10-07|
|  2       | B      |2018-12-31|2019-01-27|
|  2       | B A    |2019-01-28|2019-02-27|
|  2       | A      |2019-02-28|2019-06-25|
-------------------------------------------

就像是,找到 p 的值是从何时到何时在数据中显示特定的 id _ 。如果同一天有多个 p ,则两者都应存在于数据中,并用空格分隔。

It is something like, finding which values of p are present in the data for a particular id_ from when to when. If there are multiple p in a same day then both should be present in the data, seperated by a space.

我尝试通过在 min(d1) max(d2)并相应地填充它们。从该数据框中,经过一些融化和分组,我可以获得所需的结果。

I tried to do this is by creating each and every dates in the range min(d1) and max(d2) and filling them accordingly. From that dataframe, after some melting and grouping I can get the desired result.

但是该过程需要很长时间,而且效率很低。

But the process takes very long time and is very inefficient.

我正在寻找一种执行此任务的有效方法。

I am looking for an efficient method for performing this task.

我也可以遇到更复杂的情况重叠,即两个以上p值之间重叠。

I can also have more complex cases of overlap, ie overlap among more than two p-values.

请参阅下面的示例数据,

See a sample data below,

+----------+--------+----------+----------+
|id_       | p      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-07-19|
|  2       | C      |2018-06-27|2018-07-07|
|  2       | A      |2018-07-02|2019-02-27|
|  2       | A      |2019-03-28|2019-06-25|
-------------------------------------------

必须转换为

+----------+--------+----------+----------+
|id_       | q      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-06-26|
|  2       | B C    |2018-06-27|2018-07-01|
|  2       | B C A  |2018-07-02|2018-07-07|
|  2       | A B    |2018-07-08|2018-07-19|
|  2       | A      |2018-07-20|2019-02-27|
|  2       | A      |2019-03-28|2019-06-25|
-------------------------------------------

q中单个项目的顺序无关紧要。即,如果A,B和C重叠。可以显示为ABC,BCA或ACB,等等。

Order of individual items in the q doesn't matter. ie either if A, B and C are in overlap. It can either be shown as A B C, or B C A or A C B so on.

我还添加了一个很难实现的边缘情况,即 d2 == lead(d1).over(window)。在这种情况下,可以安全地假设 p 的值是不同的。即 p!= lead(p).over(window)

I am also adding an edge case that is hard to comeby, that is d2 == lead(d1).over(window). In this case it can be safely assumed, the p values are different. ie p != lead(p).over(window).

+---+---+----------+----------+
|id_| p |    d1    | d2       |
+---+---+----------+----------+
|100| 12|2013-10-16|2014-01-17|
|100| 12|2014-01-20|2014-04-15|
|100| 12|2014-04-22|2014-05-19|
|100| 12|2014-05-22|2014-06-19|
|100| 12|2014-07-23|2014-09-18|
|100| 12|2014-09-23|2014-12-18|
|100| 12|2014-12-20|2015-01-16|
|100| 12|2015-01-23|2015-02-19|
|100| 12|2015-02-21|2015-04-20|
|100| 7 |2015-04-20|2015-05-17|
|100| 7 |2015-05-19|2015-06-15|
|100| 7 |2015-06-18|2015-09-01|
|100| 7 |2015-09-09|2015-11-26|
+---+---+----------+----------+

在上面的数据中,从底部开始的第4和第5行显示了这种情况。在这种情况下,预期结果为

In the above data, 4th and 5th row from the bottom shows the case. In this case the expected result is,

+---+-----+----------+----------+
|id_| p   | d1       | d2       |
+---+-----+----------+----------+
|100| 12  |2013-10-16|2014-01-17|
|100| 12  |2014-01-20|2014-04-15|
|100| 12  |2014-04-22|2014-05-19|
|100| 12  |2014-05-22|2014-06-19|
|100| 12  |2014-07-23|2014-09-18|
|100| 12  |2014-09-23|2014-12-18|
|100| 12  |2014-12-20|2015-01-16|
|100| 12  |2015-01-23|2015-02-19|
|100| 12  |2015-02-21|2015-04-19|
|100| 12 7|2015-04-20|2015-04-20|
|100| 7   |2015-04-21|2015-05-17|
|100| 7   |2015-05-19|2015-06-15|
|100| 7   |2015-06-18|2015-09-01|
|100| 7   |2015-09-09|2015-11-26|
+---+-----+----------+----------+

以下是同一情况的另一个示例

Another example for the same case is given below,

+---+---+----------+----------+
|id_| p | d1       | d2       |
+---+---+----------+----------+
|101| 12|2015-02-24|2015-03-23|
|101| 12|2015-04-01|2015-05-19|
|101| 12|2015-05-29|2015-06-25|
|101| 12|2015-07-03|2015-07-30|
|101| 12|2015-09-02|2015-09-29|
|101| 12|2015-10-02|2015-10-29|
|101| 9 |2015-10-29|2015-11-11|
|101| 9 |2015-11-25|2015-12-22|
+---+---+----------+----------+

同样的预期结果是,

+---+-----+----------+----------+
|id_| q   | d1       | d2       |
+---+-----+----------+----------+
|101| 12  |2015-02-24|2015-03-23|
|101| 12  |2015-04-01|2015-05-19|
|101| 12  |2015-05-29|2015-06-25|
|101| 12  |2015-07-03|2015-07-30|
|101| 12  |2015-09-02|2015-09-29|
|101| 12  |2015-10-02|2015-10-28|
|101| 12 9|2015-10-29|2015-10-29|
|101| 9   |2015-10-30|2015-11-11|
|101| 9   |2015-11-25|2015-12-22|
+---+---+------------+----------+


推荐答案

更新:基于OP的评论和更新,由于可能发生许多重叠,我认为dataframe-JOIN可能是最直接的方法。以下是我在Spark 2.4.0上测试过的全新解决方案(array_join,transform,sequence等都需要Spark 2.4+):

Update: Based on OP's comments and update, since any number of overlapping might happen, I think a dataframe-JOIN is probably the most straightforward way. Below is a completely new solution I tested on Spark 2.4.0 (array_join, transform, sequence etc. require Spark 2.4+):

Update-2:在注释/聊天中的每次讨论中,我都添加了代码逻辑,以设置每个 drange(d1,d2)的边界,以及如何调整d1 / d2,在 df_drange 中需要一个新的 flag 字段来完成此逻辑。详细信息请参见下面的 设置边界 部分

Update-2: Per discussion in the comments/chat, I've added the code-logic to set up the boundaries for each drange(d1, d2) on how/when to adjust d1/d2, a new flag field is required in df_drange to complete this logic. details see below Set up boundaries section

更新3:调整了代码,以处理(d1 == d2)在df_drange中的情况。

Update-3: adjusted code to handle when (d1 == d2) in df_drange. originally removed such cases.

注意: df2和d1和d2转换为DateType(),而原始df由于需要进行一些串联操作,因此将两个字段保留为StringType()。

Note: I added df2 with d1 and d2 converted to DateType(), while the original df keeps two fields as StringType() since we need some concatenation operations.

from pyspark.sql import Window
from pyspark.sql.functions import lead, expr, to_date, collect_set, array_sort, array_join, broadcast

df = spark.createDataFrame([
      (1, 'A', '2018-09-26', '2018-10-26')
    , (2, 'B', '2018-06-21', '2018-07-19')
    , (2, 'C', '2018-06-27', '2018-07-07')
    , (2, 'A', '2018-07-02', '2019-02-27')
    , (2, 'A', '2019-03-28', '2019-06-25')
  ], ['id_', 'p', 'd1', 'd2'])

# convert d1, d2 to DateType() if they are StringType()
df2 = df.withColumn('d1', to_date('d1')).withColumn('d2', to_date('d2'))

df2.printSchema()
root
 |-- id_: long (nullable = true)
 |-- p: string (nullable = true)
 |-- d1: date (nullable = true)
 |-- d2: date (nullable = true)



创建参考数据框:df_drange



df_drange 包含d1中所有不同的日期和d2,
加上当 df_drange.d1 来自<$ c时设置为 1 的标志$ c> df.d2
(在原始df中),否则为 0 。排序日期并将其细分为间隔日期范围。检索字段 d1 d2 flag (仅d1 )并将其转换为适当的DataType()

Create a referencing dataframe: df_drange

df_drange contains all distinct dates from d1 and d2, plus a flag which is set to 1 when df_drange.d1 is from df.d2 (in the original df) and 0 otherwise. sort the dates and segement them into interval date ranges. retrieve the fields d1, d2, flag(d1 only) and convert them into proper DataType()

df_drange = df.select('id_', 'd1', lit(0).alias('flag')).union(df.select('id_', 'd2', lit(1))) \
    .groupby('id_') \
    .agg(array_sort(collect_set(concat('d1', lit('-'), 'flag'))).alias('dates')) \
    .withColumn('dates', expr("""
         explode(transform(sequence(0, size(dates)-2), i -> named_struct('d1', dates[i], 'd2', dates[i+1])))
       """)) \
    .selectExpr(
         'id_'
       , "to_date(substring_index(dates.d1, '-', 3)) as d1"
       , "to_date(substring_index(dates.d2, '-', 3)) as d2"
       , "boolean(substring_index(dates.d1, '-', -1)) as flag"
     )

df_drange.orderBy('id_','d1').show()
+---+----------+----------+-----+
|id_|        d1|        d2| flag|
+---+----------+----------+-----+
|  1|2018-09-26|2018-10-26|false|
|  2|2018-06-21|2018-06-27|false|
|  2|2018-06-27|2018-07-02|false|
|  2|2018-07-02|2018-07-07|false|
|  2|2018-07-07|2018-07-19| true|
|  2|2018-07-19|2019-02-27| true|
|  2|2019-02-27|2019-03-28| true|
|  2|2019-03-28|2019-06-25|false|
+---+----------+----------+-----+

df_drange.printSchema()
root
 |-- id_: long (nullable = true)
 |-- d1: date (nullable = true)
 |-- d2: date (nullable = true)
 |-- flag: boolean (nullable = true)



使用Join



与原始df左连接,对于每个id_,在( d1 d2 )和原始df 的( d1 d2 )。从 df_drange到
groupby( id _ d1 d2 flag )之后em>,得到array_join(collect_set(p),''):

Set up df1 with Join

Left join with the original df and for each id_ with any overlapping between (d1, d2) of df_dranges and (d1, d2) of the original df. after groupby(id_, d1, d2, flag) from df_drange, get the array_join(collect_set(p), ' '):

df1 = broadcast(df_drange).join(
      df2
    , (df2.id_ == df_drange.id_) & (
            ((df2.d1 < df_drange.d2) & (df2.d2 > df_drange.d1)) 
          | ((df_drange.d1 == df_drange.d2) & df_drange.d1.between(df2.d1, df2.d2)) 
      )
    , how = 'left'
).groupby(df_drange.id_, df_drange.d1, df_drange.d2, df_drange.flag) \
 .agg(array_join(collect_set('p'), ' ').alias('q'))

df1.show()
+---+----------+----------+-----+-----+
|id_|        d1|        d2| flag|    q|
+---+----------+----------+-----+-----+
|  1|2018-09-26|2018-10-26|false|    A|
|  2|2018-06-21|2018-06-27|false|    B|
|  2|2018-06-27|2018-07-02|false|  C B|
|  2|2018-07-02|2018-07-07|false|C B A|
|  2|2018-07-07|2018-07-19| true|  B A|
|  2|2018-07-19|2019-02-27| true|    A|
|  2|2019-02-27|2019-03-28| true|     |
|  2|2019-03-28|2019-06-25|false|    A|
+---+----------+----------+-----+-----+



设置边界



对于df1,如果q ==,有一个间隙,应删除此类行。
根据评论/聊天中的讨论,根据标志,next_flag,next_d1
定义每个范围的边界。以下是伪代码,用于显示当前逻辑如何/何时调整d1 / d2:

Set up boundaries

For df1, if q == '', there is a gap, such rows should be removed. the boundaries of each drange is defined based on flag, next_flag, next_d1 as discussed in the comments/chat. below is the pesudo-code to show the current logic how/when to adjust d1/d2:

flag = (if d1 is from original_d2) ? true : false
both next_d1 and next_flag defined on WindowSpec-w1

# for df1.d1: if flag is true, add 1 day, otherwise keep as-is
d1 = IF(flag, date_add(d1,1), d1)

# for df1.d2: keep as-is when there is gap with the next row or 
# the next_flag is true, else minus 1 day
d2 = IF((next_d1 != d2) or next_flag, d2, date_sub(d2,1))

实际代码:

# WindowSpec to calculate next_d1
w1 = Window.partitionBy('id_').orderBy('d1')

# filter out gaps and calculate next_d1 and the adjusted d1 and d2
df_new = df1.where('q!= ""') \
            .withColumn('next_d1', lead('d1').over(w1)) \
            .withColumn('next_flag', coalesce(lead('flag').over(w1), lit(True))) \
            .selectExpr(
                    'id_'
                  , 'q'
                  , 'IF(flag, date_add(d1,1), d1) AS d1'
                  , 'IF((next_d1 != d2) or next_flag, d2, date_sub(d2,1)) AS d2'
             )

df_new.show()
+---+-----+----------+----------+
|id_|    q|        d1|        d2|
+---+-----+----------+----------+
|  1|    A|2018-09-26|2018-10-26|
|  2|    B|2018-06-21|2018-06-26|
|  2|  C B|2018-06-27|2018-07-01|
|  2|C B A|2018-07-02|2018-07-07|
|  2|  B A|2018-07-08|2018-07-19|
|  2|    A|2018-07-20|2019-02-27|
|  2|    A|2019-03-28|2019-06-25|
+---+-----+----------+----------+

这篇关于在PySpark中结合列值查找最小和最大范围的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆