2022年2月11日
2022年3月28日
AWS Glueを環境としてPySparkを選択して使っていると、ローカルでテストしたいときがありました。
テストと言ってもユニットテストとかではなく、「こういう書き方いけるっけ?」という試行錯誤がしたかったです。
そのたびにジョブを実行していたら時間もお金も掛かってしまうので、ローカルで試すことにしました。
試したいことも単純なことでしたし、先人のおかげでかんたんにできました。
というわけで環境セットアップとSQLの実行の仕方紹介します。
PySparkをインストール
$ pip install pyspark
結果はこんな感じ
$ pip install pyspark
Collecting pyspark
Downloading pyspark-3.2.1.tar.gz (281.4 MB)
|████████████████████████████████| 281.4 MB 120 kB/s
Collecting py4j==0.10.9.3
Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
|████████████████████████████████| 198 kB 46.0 MB/s
Using legacy 'setup.py install' for pyspark, since package 'wheel' is not installed.
Installing collected packages: py4j, pyspark
Running setup.py install for pyspark ... done
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
PySpark起動
$ pyspark
すると何やらインタプリタが立ち上がります
$ pyspark
Python 3.7.1 (default, Jan 20 2021, 19:14:42)
[Clang 12.0.0 (clang-1200.0.32.28)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
22/02/09 16:36:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/09 16:36:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Python version 3.7.1 (default, Jan 20 2021 19:14:42)
Spark context Web UI available at http://192.168.10.112:4040
Spark context available as 'sc' (master = local[*], app id = local-1644392202304).
SparkSession available as 'spark'.
>>>
テスト
この状態でもうSQLが実行できます。単純な書き方を確かめたいだけであればこれで十分です
>>> spark.sql('select 1')
DataFrame[1: int]
>>> spark.sql('select 1').show()
+---+
| 1|
+---+
| 1|
+---+
私はdate系のコンバートを試したかったのでこんなの試しました
>>> df = spark.createDataFrame([("20220130", "0400")], ["date", "time"])
>>> df.show()
+--------+----+
| date|time|
+--------+----+
|20220130|0400|
+--------+----+
>>> df.createOrReplaceTempView('testtable')
>>> spark.sql("select to_date(date, 'yyyyMMdd') as date, to_timestamp(time, 'HHmm') as time from testtable").show()
+----------+-------------------+
| date| time|
+----------+-------------------+
|2022-01-30|1970-01-01 04:00:00|
+----------+-------------------+
エラーもわざと発生させてどんなのが出るかとかGlueで発生しているものと同じかとかにも使えます
>>> spark.sql("""select to_date(date, 'yyyyMMdd') as date, to_timestamp(time2, 'HHmm') as time from testtable""").show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/tkykm/.pyenv/versions/3.7.1/lib/python3.7/site-packages/pyspark/sql/session.py", line 723, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/Users/tkykm/.pyenv/versions/3.7.1/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/Users/tkykm/.pyenv/versions/3.7.1/lib/python3.7/site-packages/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: cannot resolve 'time2' given input columns: [testtable.date, testtable.time]; line 1 pos 55;
'Project [to_date(date#31, Some(yyyyMMdd)) AS date#300, 'to_timestamp('time2, HHmm) AS time#301]
+- SubqueryAlias testtable
+- View (`testtable`, [date#31,time#32])
+- LogicalRDD [date#31, time#32], false
まとめ
という感じで簡単に環境作成とSQL試行錯誤できました。
なにかの参考になれば幸いです。