1. initilaztion
1. spark session
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("name") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. spark context
1. from spark session
sc = spark.sparkContext
2. from spark context
conf = SparkConf().setAppName("KMeans").setMaster("local[*]")
sc = SparkContext(conf =conf)
or
sc = SparkContext.getOrCreate(conf)
2. paritition by index
.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)\
this get rid of the first line(rdd) in the file.
3. opeartions of spark dataframe
1. read csv file into dataframe, with separator
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df_demand = spark.read\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.option("mode", "DROPMALFORMED")\
.option("delimiter", "|")\
.load(demand_data)
2. Basic Operations
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
3. read dictionary into spark dataframe
- dict -> pandas.df -> spark.df
https://stackoverflow.com/questions/43751509/how-to-create-new-dataframe-with-dict?rq=1 - dict -> spark.rdd (json rdd) -> json parser -> spark.df
https://stackoverflow.com/questions/51774796/how-to-convert-dictionary-to-data-frame-in-pyspark
Note:
when use pandas df to create spark df, if you want to perserve indices,
use: spark_df = spark.createDataFrame(pandasdf.reset_index(drop=False))
must have reset_index(drop=False) to preserve indices, as spark SQL has no concept of index
3. Unique value of a column
select(“col name”).distinct()
.drop(“col name”)
4. drop col
.drop(“col name”)
5. spark df to pandas df
.toPandas() # cost is expensive, try to minimum use it! it’s an action, collect all the data to the driver
6.sort by col, (descending)
from pyspark.sql.functions import desc
(group_by_dataframe
.count()
.filter(”count
>= 10”)
.sort(desc(“count”))
OR
.orderBy(‘Year’, ascending=False)
7. filter
In pyspark, filter on dataframe doesn’t take functions that returns a boolean,
it only takes SQL experssion that returns a boolean
If you want it to take a boolean function, use udf, sample:
@F.udf(returnType=BooleanType())
def my_filter(col):
func
return Boolean
df.filter(my_filter('col')).show()
Make sure includes:
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
8. map on column
df.withColumn(‘new column’, udf(‘old column’)))
9. coalesce and repartition
repartition(n) = coalesce(n, shuffle = true)
4. Wide dependency and narrow dependency,
Orgin we have M partitions, we repartiton to N partitions.
narrow dependency, usually we have M > N, like Map, doesn’t triger shuffle
when M is greatly larger than N, it’s better to manually triger shuffle, otherwise
it affects parallelization and performance.
Wide dependency, usually we have M < N, like Join, groupByKey, triger shuffle
5. Inferschema dependency:
NullType
IntegerType
LongType
DecimalType
DoubleType
TimestampType
BooleanType
StringType