[Spark] Pyspark Dataframe 주요 메서드 샘플 정리(concat_ws, regexp_replace, explode, withColumnRenamed)
1. concatenate to columns with null values
널 값을 포함하는 컬럼 간에 concat하는 방법
from pyspark.sql.functions import concat_ws, col, concat
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([["1", "2"], ["2", None], ["3", "4"], ["4", "5"], [None, "6"]]).toDF("a", "b")
df = df.withColumn("concat", concat(df.a, df.b)) #잘못된 접근 사례1
df = df.withColumn("concat + cast", concat(df.a.cast('string'), df.b.cast('string'))) #잘못된 접근 사례2
df = df.withColumn("concat_ws", concat_ws("", df.a, df.b)) #옳은 방법
df.show()
functions 모듈의 concat()은 value에 null 값이 존재할 경우 null로 return되어 null 값을 처리하지 못합니다.
마찬가지로 cast로 string 타입으로 변환하더라도 null 값은 그대로 살아있어 null 값이 그대로 출력됩니다.
이럴 때에는 concat_ws()를 사용하여 null 값은 무시하고 null이 아닌 컬럼값을 붙일 수 있습니다.
2. split a string row into multiple rows
string 타입의 컬럼값을 split하고 그 값을 새로운 컬럼으로 받는 방법
spark = SparkSession.builder.getOrCreate()
columns = ['id', 'text']
data = [('1', '별다른 음악적 활동 없이 주변 뮤지션들의 곡에 피쳐링 참여나 작은 공연들이 있으면 무대에 서곤 했었습니다.'),
('2', '개인적으로 앨범을 내거나 음악적 활동은 하지 않았었는데 작년 겨울에 작업했던 첫 싱글 과정이 너무 재밌었습니다.'),
('3', '그 계기로 계속 해서 앨범을 내는 것이 좋겠다는 생각이 들어 올해도 바로 곡 작업을 시작해 두 번째 싱글을 준비하게 되었습니다.'),
('4', '재즈 발라드를 좋아하거나 잔잔한 음악을 좋아하시는 분들이 들으면 좋을 거 같습니다. 잠들기 전이나 하루 일과가 끝나고 집에 가는 길 멍때리며 들을 노래가 필요했던 분들이라면 꼭 들어보셨으면 좋겠습니다.')]
df = spark.createDataFrame(data, columns)
df.show()
from pyspark.sql.functions import split, regexp_replace, explode
df2 = df.withColumn('text', regexp_replace('text', '다\.', '다\.&split&'))
df3 = df2.withColumn('new_text', explode(split('text', '&split&')))
df4 = df3.filter(~((col('new_text').isNull()|col('new_text')=='')))
df4.show()
점(.)을 포함한 문자열로 delimiter를 지정하고 싶은 경우 '\.'와 같이 백슬래시(\)를 붙여 이스케이스 문자로 만들어 주어야 합니다. 위와 같이 구분자를 지정하여 split한다면, ''와 같은 value도 new_text 컬럼에 포함됩니다. 공백인 값을 제거해주기 위해서는 filter를 적용하면 됩니다.
3. convert columns to lowercase/uppercase
컬럼명 형변환 하는 방법
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([["1", "jenny"], ["2", "junsoo"], ["3", "sunmin"]], ["id", "name"])
for col in df.columns:
df = df.withColumnRenamed(col, col.upper()) #성공 사례1
for col in df.columns:
df2 = df.withColumnRenamed(col, col.lower()) #잘못된 사례
df3 = df.toDF(*[col.lower() for col in df.columns]) #성공 사례2
df4 = df.toDF(*list(map(lambda x: x.lower(), df.columns))) #성공 사례3
df.show()
df2.show()
df3.show()
df4.show()
컬럼명을 변환하기 위해 먼저 df.columns를 통해 데이터프레임의 컬럼명을 '리스트'로 받아와야 합니다.
그리고 받아온 column_name list에 반복문(ex. 컴프리헨션, for)을 적용하여 형변환된 컬럼명으로 갱신할 수 있습니다.
이때 리스트 컴프리헨션 방식이라면 toDF()으로, for문을 사용하는 경우 withColumnRenamed()으로 컬럼명을 변경합니다.
잘못된 사례를 보면 알 수 있듯이 for문으로 withColumnRenamed()로 접근할 때 for문에서 새로운 df를 선언할 수는 없습니다.
withColumnRenamed은 df.withColumnRenamed(컬럼명, 변경할 컬럼명)일 때 해당 df을 기준으로 컬럼명을 변경합니다.
따라서 예제처럼 컬럼명 리스트(['ID', 'NAME'])의 마지막 컬럼인 NAME만 lower()이 적용된 것을 확인할 수 있습니다.
참고
https://stackoverflow.com/questions/52963588/split-pyspark-dataframe-column-at-the-dot
https://stackoverflow.com/questions/43005744/convert-columns-of-pyspark-data-frame-to-lowercase