この記事は、Apache Sparkでやりたいことを起点にしたメモになります。
showでデータを確認する
showで簡易的に確認 (DataFrame show)
calendarDF.show()
===
+--------+-------+---------+
| ymd| type|DayOfWeek|
+--------+-------+---------+
|20221001|Weekend| Sat|
|20221002|Weekend| Sun|
|20221003|Weekday| Mon|
|20221004|Weekday| Tue|
|20221005|Weekday| Wed|
|20221006|Weekday| Thu|
|20221007|Weekday| Fri|
|20221008|Weekend| Sat|
|20221009|Weekend| Sun|
|20221010|Holiday| Mon|
|20221011|Weekday| Tue|
|20221012|Weekday| Wed|
|20221013|Weekday| Thu|
|20221014|Weekday| Fri|
|20221015|Weekend| Sat|
|20221016|Weekend| Sun|
|20221017|Weekday| Mon|
|20221018|Weekday| Tue|
|20221019|Weekday| Wed|
|20221020|Weekday| Thu|
+--------+-------+---------+
only showing top 20 rows
showで項目全てを確認
カラムに含まれる値が長い場合、デフォルトでは途中で切れてしまい最後まで確認できません。
これは、truncateというパラメータがデフォルトで有効になっており、20文字で切り落としているためです。
t = (0,"abcdefghijklmnopqrstu",)
list=[]
for i in range(100):
list.append(t)
dataframe = spark.createDataFrame(list)
dataframe.show()
===
+---+--------------------+
| _1| _2|
+---+--------------------+
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
| 0|abcdefghijklmnopq...|
+---+--------------------+
# truncateパラメータを指定
dataframe.show(truncate=False)
===
+---+---------------------+
|_1 |_2 |
+---+---------------------+
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
|0 |abcdefghijklmnopqrstu|
+---+---------------------+
DataFrameの列を操作する
withColumnで列を追加 ( pysparkのwithcolumn で whenを使う)
df.select("*").withColumn("addColumn",lit(0).cast(IntegerType())).show()
別名(alias)
orderDF.groupBy("order_date","order_parts_id").agg(sum(orderDF.order_num).alias("total_order_num"),count(orderDF.order_date).alias("clients")).show()
===
+----------+--------------+---------------+-------+
|order_date|order_parts_id|total_order_num|clients|
+----------+--------------+---------------+-------+
|2022-10-01| G-611310| 390| 7|
|2022-10-01| G-611668| 270| 4|
|2022-10-01| G-611933| 456| 9|
|2022-10-01| G-612167| 189| 4|
|2022-10-01| G-612464| 429| 10|
|2022-10-01| G-612470| 169| 3|
|2022-10-01| G-612819| 545| 9|
|2022-10-01| G-613731| 99| 2|
|2022-10-01| G-614382| 272| 7|
|2022-10-01| G-615200| 73| 2|
|2022-10-01| G-615410| 416| 7|
|2022-10-01| G-615664| 414| 10|
|2022-10-01| G-615840| 179| 4|
|2022-10-01| G-617716| 67| 2|
|2022-10-01| G-617791| 239| 5|
|2022-10-01| G-618164| 360| 7|
|2022-10-01| G-618390| 342| 7|
|2022-10-01| G-619069| 364| 6|
|2022-10-01| G-619519| 114| 3|
|2022-10-01| G-620028| 193| 4|
+----------+--------------+---------------+-------+
列で比較 (when)
calendarDF.withColumn('JapaneseType',
F.when( \
F.col("type")=="Holiday" \
,"祝日" \
)\
.when( \
F.col("type")=="Weekend" \
,"休日" \
) \
.when( \
F.col("type")=="Weekday" \
,"平日" \
) \
).show()
===
+--------+-------+---------+------------+
| ymd| type|DayOfWeek|JapaneseType|
+--------+-------+---------+------------+
|20221001|Weekend| Sat| 休日|
|20221002|Weekend| Sun| 休日|
|20221003|Weekday| Mon| 平日|
|20221004|Weekday| Tue| 平日|
|20221005|Weekday| Wed| 平日|
|20221006|Weekday| Thu| 平日|
|20221007|Weekday| Fri| 平日|
|20221008|Weekend| Sat| 休日|
|20221009|Weekend| Sun| 休日|
|20221010|Holiday| Mon| 祝日|
|20221011|Weekday| Tue| 平日|
|20221012|Weekday| Wed| 平日|
|20221013|Weekday| Thu| 平日|
|20221014|Weekday| Fri| 平日|
|20221015|Weekend| Sat| 休日|
|20221016|Weekend| Sun| 休日|
|20221017|Weekday| Mon| 平日|
|20221018|Weekday| Tue| 平日|
|20221019|Weekday| Wed| 平日|
|20221020|Weekday| Thu| 平日|
+--------+-------+---------+------------+
DataFrameで集計する
合計( sum )
df.groupBy("order_parts_id").agg({"order_num": "sum"}).show()
===
+--------------+--------------+
|order_parts_id|sum(order_num)|
+--------------+--------------+
| G-000008| 187|
| G-000012| 614|
| G-000028| 517|
| G-000043| 310|
| G-000063| 119|
| G-000094| 277|
| G-000120| 212|
| G-000157| 116|
| G-000162| 402|
| G-000223| 259|
| G-000256| 98|
| G-000277| 89|
| G-000285| 51|
| G-000305| 637|
| G-000349| 141|
| G-000398| 57|
| G-000400| 282|
| G-000407| 401|
| G-000439| 77|
| G-000470| 407|
+--------------+--------------+
only showing top 20 rows
カウント( count )
df.groupBy("order_parts_id").count().show()
===
+--------------+-----+
|order_parts_id|count|
+--------------+-----+
| G-366517| 3|
| G-366520| 7|
| G-366570| 2|
| G-366571| 3|
| G-366573| 7|
| G-366628| 10|
| G-366734| 6|
| G-366749| 3|
| G-366773| 1|
| G-366860| 2|
| G-366958| 4|
| G-367153| 8|
| G-367157| 8|
| G-367175| 1|
| G-367233| 9|
| G-367236| 2|
| G-367267| 3|
| G-367281| 3|
| G-367324| 10|
| G-367364| 7|
+--------------+-----+
only showing top 20 rows
DataFrameでソートする( DataFrame asc, desc )
昇順で並べ替え( デフォルト )
df.orderBy("id").show()
===
+-----+--------+-----+---+--------------------+
| id|category|value|num| timestamp|
+-----+--------+-----+---+--------------------+
|00001| c001|test1| 1|2022-10-01T10:00:...|
|00002| c001|test2| 2|2022-10-02T10:00:...|
|00003| c002|test3| 3|2022-10-03T10:00:...|
|00004| c003|test4| 4|2022-10-04T10:00:...|
|00005| c004|test5| 5|2022-10-05T10:00:...|
+-----+--------+-----+---+--------------------+
昇順で並べ替え( 明示的にascを指定 )
df.orderBy(asc("id")).show()
===
+-----+--------+-----+---+--------------------+
| id|category|value|num| timestamp|
+-----+--------+-----+---+--------------------+
|00001| c001|test1| 1|2022-10-01T10:00:...|
|00002| c001|test2| 2|2022-10-02T10:00:...|
|00003| c002|test3| 3|2022-10-03T10:00:...|
|00004| c003|test4| 4|2022-10-04T10:00:...|
|00005| c004|test5| 5|2022-10-05T10:00:...|
+-----+--------+-----+---+--------------------+
降順で並べ替え
df.orderBy(desc("id")).show()
===
+-----+--------+-----+---+--------------------+
| id|category|value|num| timestamp|
+-----+--------+-----+---+--------------------+
|00005| c004|test5| 5|2022-10-05T10:00:...|
|00004| c003|test4| 4|2022-10-04T10:00:...|
|00003| c002|test3| 3|2022-10-03T10:00:...|
|00002| c001|test2| 2|2022-10-02T10:00:...|
|00001| c001|test1| 1|2022-10-01T10:00:...|
+-----+--------+-----+---+--------------------+
DataFrameを結合する
データを事前に確認します。
calendarDf = S3bucket_calendar.toDF()
calendarDf.show()
===
+----------+-------+---------+
| ymd| type|DayOfWeek|
+----------+-------+---------+
|2022-10-01|Weekend| Sat|
|2022-10-02|Weekend| Sun|
|2022-10-03|Weekday| Mon|
|2022-10-04|Weekday| Tue|
|2022-10-05|Weekday| Wed|
|2022-10-06|Weekday| Thu|
|2022-10-07|Weekday| Fri|
|2022-10-08|Weekend| Sat|
|2022-10-09|Weekend| Sun|
|2022-10-10|Holiday| Mon|
|2022-10-11|Weekday| Tue|
|2022-10-12|Weekday| Wed|
|2022-10-13|Weekday| Thu|
|2022-10-14|Weekday| Fri|
|2022-10-15|Weekend| Sat|
|2022-10-16|Weekend| Sun|
|2022-10-17|Weekday| Mon|
|2022-10-18|Weekday| Tue|
|2022-10-19|Weekday| Wed|
|2022-10-20|Weekday| Thu|
+----------+-------+---------+
only showing top 20 rows
orderDf = S3bucket_order.toDF()
orderDf.show()
===
+----------+--------------+---------+-----------+
|order_date|order_parts_id|order_num|customer_id|
+----------+--------------+---------+-----------+
|2022-10-01| G-000000| 62| C-000|
|2022-10-01| G-000001| 89| C-000|
|2022-10-01| G-000002| 70| C-000|
|2022-10-01| G-000003| 80| C-000|
|2022-10-01| G-000004| 33| C-000|
|2022-10-01| G-000005| 32| C-000|
|2022-10-01| G-000006| 23| C-000|
|2022-10-01| G-000006| 71| C-001|
|2022-10-01| G-000007| 49| C-000|
|2022-10-01| G-000007| 81| C-001|
|2022-10-01| G-000008| 46| C-000|
|2022-10-01| G-000009| 81| C-000|
|2022-10-01| G-000009| 94| C-001|
|2022-10-01| G-000009| 60| C-002|
|2022-10-01| G-000010| 86| C-000|
|2022-10-01| G-000011| 58| C-000|
|2022-10-01| G-000012| 70| C-000|
|2022-10-01| G-000012| 91| C-001|
|2022-10-01| G-000012| 40| C-002|
|2022-10-01| G-000013| 3| C-000|
+----------+--------------+---------+-----------+
only showing top 20 rows
等結合 (Inner join)
orderDf.join(calendarDf,orderDf.order_date == calendarDf.ymd,'inner').show()
===
+----------+--------------+---------+-----------+----------+-------+---------+
|order_date|order_parts_id|order_num|customer_id| ymd| type|DayOfWeek|
+----------+--------------+---------+-----------+----------+-------+---------+
|2022-10-01| G-000000| 62| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000001| 89| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000002| 70| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000003| 80| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000004| 33| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000005| 32| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000006| 23| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000006| 71| C-001|2022-10-01|Weekend| Sat|
|2022-10-01| G-000007| 49| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000007| 81| C-001|2022-10-01|Weekend| Sat|
|2022-10-01| G-000008| 46| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000009| 81| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000009| 94| C-001|2022-10-01|Weekend| Sat|
|2022-10-01| G-000009| 60| C-002|2022-10-01|Weekend| Sat|
|2022-10-01| G-000010| 86| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000011| 58| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000012| 70| C-000|2022-10-01|Weekend| Sat|
|2022-10-01| G-000012| 91| C-001|2022-10-01|Weekend| Sat|
|2022-10-01| G-000012| 40| C-002|2022-10-01|Weekend| Sat|
|2022-10-01| G-000013| 3| C-000|2022-10-01|Weekend| Sat|
+----------+--------------+---------+-----------+----------+-------+---------+
only showing top 20 rows
左外部結合 ( Left outer join )
calendarDf.join(orderDf,calendarDf.ymd == orderDf.order_date,'left').filter(calendarDf.ymd == '2022-10-02').show()
===
+----------+-------+---------+----------+--------------+---------+-----------+
| ymd| type|DayOfWeek|order_date|order_parts_id|order_num|customer_id|
+----------+-------+---------+----------+--------------+---------+-----------+
|2022-10-02|Weekend| Sun| null| null| null| null|
+----------+-------+---------+----------+--------------+---------+-----------+
RDDを生成する
ノートブックなどで、ファイルを作成せずに手元でサクッと試したい場合によく使います。
リストからRDDを生成
parallelizeを使用してlistからデータを生成します。
list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(list)
for row in rdd.collect():
print(row)
タプルからRDDを生成
parallelizeを使用してlistからデータを生成します。
tuple = [
("k1",1)
,("k2",2)
,("k3",3)
,("k4",4)
,("k5",5)
,("k6",6)
,("k7",7)
,("k8",8)
,("k9",9)
,("k10",10)
]
rdd = sc.parallelize(tuple)
for row in rdd.collect():
print("key:{} value:{}".format(row[0],row[1]))
createDataFrameでデータを生成
t = (0,10,)
list=[]
for i in range(100):
list.append(t)
spark.createDataFrame(list).show()
RDDとDataFrameを変換する
RDDからDataFrameへの変換
print(type(rdd))
# toDFで変換可能
df4 = rdd.toDF()
print(type(df4))
===
<class 'pyspark.rdd.RDD'>
<class 'pyspark.sql.dataframe.DataFrame'>
RDDからDataFrameへの変換(スキーマを定義)
from pyspark.sql import types as T, functions as F
schema = T.StructType([
T.StructField('id', T.StringType()),
T.StructField('value', T.LongType())
])
df = sc.parallelize([
('AAA', 1),
('BBB', 2),
('CCC', 3),
]).toDF(schema).show()
===
+---+-----+
| id|value|
+---+-----+
|AAA| 1|
|BBB| 2|
|CCC| 3|
+---+-----+
DataFrameからRDDへの変換
df = sc.parallelize([
('AAA', 1),
('BBB', 2),
('CCC', 3),
]).toDF()
print(type(df))
rdd = df.rdd
print(type(rdd))
===
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.rdd.RDD'>
パーティション ( partition ) を操作する
現在のパーティション数や、データの状態をデバッグの途中などで確認したい場合に、よく使います。
また、データの偏りを修正したり、パーティション数を増減させたい場合など、アプリケーションでよく利用します。
パーティション数の確認 (df.rdd.getnumpartitions())
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
===
Number of partitions: 1
パーティション内のデータを確認 (rdd collect())
print('Partitions structure: {}'.format(df.rdd.glom().collect()))
# 見やすいように整形しています。
===
Partitions structure: [
[
Row(id='00001', category='c001', value='test1', num=1, timestamp='2022-10-01T10:00:00+0900')
, Row(id='00002', category='c001', value='test2', num=2, timestamp='2022-10-02T10:00:00+0900')
, Row(id='00003', category='c002', value='test3', num=3, timestamp='2022-10-03T10:00:00+0900')
, Row(id='00004', category='c003', value='test4', num=4, timestamp='2022-10-04T10:00:00+0900')
, Row(id='00005', category='c004', value='test5', num=5, timestamp='2022-10-05T10:00:00+0900')
]
]
repartitionでパーティション数を変更する(パーティション数を指定)
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(df.rdd.glom().collect()))
# 見やすいように整形しています。
===
Number of partitions: 1
Partitions structure: [
[
Row(id='00001', category='c001', value='test1', num=1, timestamp='2022-10-01T10:00:00+0900')
, Row(id='00002', category='c001', value='test2', num=2, timestamp='2022-10-02T10:00:00+0900')
, Row(id='00003', category='c002', value='test3', num=3, timestamp='2022-10-03T10:00:00+0900')
, Row(id='00004', category='c003', value='test4', num=4, timestamp='2022-10-04T10:00:00+0900')
, Row(id='00005', category='c004', value='test5', num=5, timestamp='2022-10-05T10:00:00+0900')
]
]
# パーティション数2を指定
repartitioned = df.repartition(2)
print('Number of partitions: {}'.format(repartitioned.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(repartitioned.rdd.glom().collect()))
# 見やすいように整形しています。
===
Number of partitions: 2
Partitions structure: [
[
Row(id='00004', category='c003', value='test4', num=4, timestamp='2022-10-04T10:00:00+0900')
, Row(id='00005', category='c004', value='test5', num=5, timestamp='2022-10-05T10:00:00+0900')
, Row(id='00002', category='c001', value='test2', num=2, timestamp='2022-10-02T10:00:00+0900')
]
,[
Row(id='00003', category='c002', value='test3', num=3, timestamp='2022-10-03T10:00:00+0900')
, Row(id='00001', category='c001', value='test1', num=1, timestamp='2022-10-01T10:00:00+0900')
]
]
repartitionでパーティション数を変更する(キーを指定)
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(df.rdd.glom().collect()))
# 見やすいように整形しています。
===
Number of partitions: 1
Partitions structure: [
[
Row(id='00001', category='c001', value='test1', num=1, timestamp='2022-10-01T10:00:00+0900')
, Row(id='00002', category='c001', value='test2', num=2, timestamp='2022-10-02T10:00:00+0900')
, Row(id='00003', category='c002', value='test3', num=3, timestamp='2022-10-03T10:00:00+0900')
, Row(id='00004', category='c003', value='test4', num=4, timestamp='2022-10-04T10:00:00+0900')
, Row(id='00005', category='c004', value='test5', num=5, timestamp='2022-10-05T10:00:00+0900')
]
]
repartitioned = df.repartition("category")
print('Number of partitions: {}'.format(repartitioned.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(repartitioned.rdd.glom().collect()))
# 見やすいように整形しています。
Number of partitions: 40
Partitions structure: [
[]
, []
, []
, []
, []
, []
, []
, []
, []
, []
, []
, []
, []
, [
Row(id='00003', category='c002', value='test3', num=3, timestamp='2022-10-03T10:00:00+0900')
]
, []
, []
, []
, []
, []
, []
, []
, []
, [
Row(id='00004', category='c003', value='test4', num=4, timestamp='2022-10-04T10:00:00+0900')
]
, []
, []
, []
, []
, []
, []
, []
, [
Row(id='00005', category='c004', value='test5', num=5, timestamp='2022-10-05T10:00:00+0900')
]
, []
, []
, []
, []
, []
, []
, []
, [
Row(id='00001', category='c001', value='test1', num=1, timestamp='2022-10-01T10:00:00+0900')
, Row(id='00002', category='c001', value='test2', num=2, timestamp='2022-10-02T10:00:00+0900')
]
, []
]
coalesceでパーティション数を減らす(パーティション数を指定)
print('Number of partitions: {}'.format(repartitioned.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(repartitioned.rdd.glom().collect()))
# 見やすいように整形しています。
===
Number of partitions: 2
Partitions structure: [
[
Row(id='00004', category='c003', value='test4', num=4, timestamp='2022-10-04T10:00:00+0900')
, Row(id='00005', category='c004', value='test5', num=5, timestamp='2022-10-05T10:00:00+0900')
, Row(id='00002', category='c001', value='test2', num=2, timestamp='2022-10-02T10:00:00+0900')
]
,[
Row(id='00003', category='c002', value='test3', num=3, timestamp='2022-10-03T10:00:00+0900')
, Row(id='00001', category='c001', value='test1', num=1, timestamp='2022-10-01T10:00:00+0900')
]
]
coalesced = repartitioned.coalesce(1)
print('Number of partitions: {}'.format(coalesced.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(coalesced.rdd.glom().collect()))
# 見やすいように整形しています。
===
Number of partitions: 1
Partitions structure: [
[
Row(id='00004', category='c003', value='test4', num=4, timestamp='2022-10-04T10:00:00+0900')
, Row(id='00005', category='c004', value='test5', num=5, timestamp='2022-10-05T10:00:00+0900')
, Row(id='00002', category='c001', value='test2', num=2, timestamp='2022-10-02T10:00:00+0900')
, Row(id='00003', category='c002', value='test3', num=3, timestamp='2022-10-03T10:00:00+0900')
, Row(id='00001', category='c001', value='test1', num=1, timestamp='2022-10-01T10:00:00+0900')
]
]
最後に
今回、Sparkでやりたいことを元に逆引きの実装方法をまとめました。まだ一部のため、今後も随時追加・更新していく予定です。どなたかのお役に立てれば幸いです。
Sparkの基礎的な解説は、こちらの「Sparkの基本を解説」にも載せています。よろしければご確認ください。
今後も、「データエンジニアのTech blogとは」に沿った内容を中心に発信していく予定です。引き続きよろしくおねがいします!
コメント