{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Welcome to\n", " ____ __\n", " / __/__ ___ _____/ /__\n", " _\\ \\/ _ \\/ _ `/ __/ '_/\n", " /__ / .__/\\_,_/_/ /_/\\_\\ version 1.4.1\n", " /_/\n", "\n", "Using Python version 2.7.10 (default, May 28 2015 17:04:42)\n", "SparkContext available as sc, HiveContext available as sqlContext.\n" ] } ], "source": [ "import os\n", "import sys\n", "spark_home = os.environ['SPARK_HOME'] = '/Users/liang/Downloads/spark-1.4.1-bin-hadoop2.6/'\n", "if not spark_home:\n", " raise ValueError('SPARK_HOME enviroment variable is not set')\n", "sys.path.insert(0,os.path.join(spark_home,'python'))\n", "sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))\n", "execfile(os.path.join(spark_home,'python/pyspark/shell.py'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#Dataframes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##Load data from json file" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+-------+\n", "| age| name|\n", "+----+-------+\n", "|null|Michael|\n", "| 30| Andy|\n", "| 19| Justin|\n", "+----+-------+\n", "\n" ] } ], "source": [ "df = sqlContext.read.json(\"people.json\")\n", "df.show()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- age: long (nullable = true)\n", " |-- name: string (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+\n", "| name|\n", "+-------+\n", "|Michael|\n", "| Andy|\n", "| Justin|\n", "+-------+\n", "\n" ] } ], "source": [ "df.select(\"name\").show()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---------+\n", "| name|(age + 1)|\n", "+-------+---------+\n", "|Michael| null|\n", "| Andy| 31|\n", "| Justin| 20|\n", "+-------+---------+\n", "\n" ] } ], "source": [ "df.select(df['name'], df['age'] + 1).show()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "|age|name|\n", "+---+----+\n", "| 30|Andy|\n", "+---+----+\n", "\n" ] } ], "source": [ "df.filter(df['age'] > 21).show()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+-----+\n", "| age|count|\n", "+----+-----+\n", "|null| 1|\n", "| 19| 1|\n", "| 30| 1|\n", "+----+-----+\n", "\n" ] } ], "source": [ "df.groupBy(\"age\").count().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##Load data from text file\n", "Create RDD and then transform an RDD to an DataFrame" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "PythonRDD[33] at RDD at PythonRDD.scala:43" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import Row\n", "# Load a text file and convert each line to a Row.\n", "lines = sc.textFile(\"people.txt\")\n", "parts = lines.map(lambda l: l.split(\",\"))\n", "people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))\n", "people" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[Row(age=29, name=u'Michael'),\n", " Row(age=30, name=u'Andy'),\n", " Row(age=19, name=u'Justin')]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "people.collect()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "DataFrame[age: bigint, name: string]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "schemaPeople = sqlContext.createDataFrame(people)\n", "# schemaPeople = people.toDF() is another way to create DataFrame\n", "schemaPeople" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[Row(age=29, name=u'Michael'),\n", " Row(age=30, name=u'Andy'),\n", " Row(age=19, name=u'Justin')]" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "schemaPeople.collect()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Name: Justin\n" ] } ], "source": [ "# Infer the schema, and register the DataFrame as a table.\n", "schemaPeople = sqlContext.createDataFrame(people)\n", "schemaPeople.registerTempTable(\"people\")\n", "\n", "# SQL can be run over DataFrames that have been registered as a table.\n", "teenagers = sqlContext.sql(\"SELECT name FROM people WHERE age >= 13 AND age <= 19\")\n", "\n", "# The results of SQL queries are RDDs and support all the normal RDD operations.\n", "teenNames = teenagers.map(lambda p: \"Name: \" + p.name)\n", "for teenName in teenNames.collect():\n", " print teenName" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" } }, "nbformat": 4, "nbformat_minor": 0 }