sábado, 14 de noviembre de 2015

Apache Spark - Programacion Avanzada

Spark contiene dos tipos diferentes de variables compartidas - uno estávariables de difusión y la segunda es acumuladores.
  • Difusión de variables - se utiliza de manera eficiente, distribuir grandes valores.
  • Acumuladores - se utiliza para agregar la información de la colección particular.

Variables de difusión

Broadcast variables que permiten al programador a mantener una única lectura en caché variable sobre cada máquina en lugar de enviar una copia del mismo con las tareas. Se pueden utilizar, por ejemplo, para dar a cada nodo, una copia de un gran conjunto de datos de entrada, de una manera eficiente. Spark también intenta distribuir las variables de transmisión utilizando algoritmos de transmisión eficientes para reducir los costos de comunicación.
Spark acciones se ejecutan a través de una serie de etapas, separadas por operaciones de "shuffle" distribuidos. Spark transmite automáticamente los datos comunes necesarios para las tareas dentro de cada etapa.
Los datos difundidos este modo se almacenan en caché en forma serializada y se deserializa antes de ejecutar cada tarea. Esto significa que crear explícitamente las variables de difusión, sólo es útil cuando las tareas a través de múltiples etapas necesitan los mismos datos o cuando el almacenamiento en caché los datos en forma deserializado es importante.
Las variables de difusión se crean a partir de una variable v llamandoSparkContext.broadcast (v). La variable de difusión es una envoltura alrededor de v, y su valor se puede acceder llamando al valor de método.El código que figura a continuación muestra esta -
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
Salida -
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
Después se crea la variable de emisión, que debe ser usado en lugar del valor ven cualquiera de las funciones se ejecutan en el cluster, de modo que v no es enviado a los nodos más de una vez. Además, el objeto v no debe ser modificado después de su emisión, a fin de garantizar que todos los nodos reciben el mismo valor de la variable de emisión.

Acumuladores

Acumuladores son variables que sólo se "agregan" a través de una operación asociativa y, por consiguiente, ser apoyadas de manera eficiente en paralelo.Pueden ser utilizados para implementar contadores (como en MapReduce) o sumas. Spark soporta de forma nativa los acumuladores de los tipos numéricos, y los programadores pueden añadir soporte para nuevos tipos. Si los acumuladores se crean con un nombre, que se mostrarán en la interfaz de usuario del Spark. Esto puede ser útil para entender el progreso de correr etapas (NOTA - esto todavía no se admite en Python).
Un acumulador se crea a partir de un valor inicial v llamandoSparkContext.accumulator (v). Las tareas que se ejecutan en el clúster pueden entonces agregar a ella mediante el complemento método o el operador + = (en Scala y Python). Sin embargo, no pueden leer su valor. Sólo el programa piloto puede leer el valor del acumulador, utilizando su valor demétodo.
El código que figura a continuación muestra un acumulador que se utiliza para sumar los elementos de una matriz -
scala> val accum = sc.accumulator(0) 

scala
> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
Si quieres ver la salida del código anterior y luego use el siguiente comando -
scala> accum.value 

Producción

res2: Int = 10 

Operaciones RDD numéricos

Spark le permite hacer diferentes operaciones en datos numéricos, utilizando uno de los métodos de la API predefinidos. Operaciones numéricas del Spark se implementan con un algoritmo de streaming que permite construir el modelo, un elemento a la vez.
Estas operaciones se calculan y se devuelven como StatusCounter objeto llamando estado () método.
La siguiente es una lista de los métodos numéricos disponibles enStatusCounter.
S.NoMétodos y Significado
1
count ()
Número de elementos de la RDD.
2
Media()
Media de los elementos de la RDD.
3
Suma()
Valor total de los elementos de la RDD.
4
Max ()
El valor máximo entre todos los elementos de la RDD.
5
Min ()
El valor mínimo entre todos los elementos de la RDD.
6
Diferencia()
Varianza de los elementos.
7
STDEV ()
Desviacion estandar.
Si desea utilizar sólo uno de estos métodos, puede llamar al método correspondiente directamente en DDR.

Apache Spark - Despliegue

Spark aplicación, utilizando chispa presentar, es un comando de shell utilizado para implementar la aplicación de chispa en un clúster. Utiliza todos los gerentes de racimo respectivos a través de una interfaz uniforme. Por lo tanto, usted no tiene que configurar la aplicación para cada uno.

Ejemplo

Tomemos el mismo ejemplo de recuento de palabras, hemos utilizado antes, el uso de comandos de shell. En este sentido, consideramos que el mismo ejemplo como una aplicación de chispa.

Entrada de la Muestra

El siguiente texto es la entrada de datos y el archivo llamado es in.txt.
people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Mira el siguiente programa -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._
import org.apache.spark._

object SparkWordCount {
def main(args: Array[String]) {

val sc
= new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())

/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input
= sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */

valcount
= input.flatMap(line line.split(" "))
.map(word (word, 1))
.reduceByKey(_ + _)

/* saveAsTextFile method is an action that effects on the RDD */
count
.saveAsTextFile("outfile")
System.out.println("OK");
}
}
Guarde el programa anterior en un archivo denominadoSparkWordCount.scala y colocarlo en un directorio llamado definida por el usuario chispa aplicación.
Nota - Si bien la transformación de la inputRDD en countRDD, estamos utilizando flatMap () para tokenizar las líneas (de archivo de texto) en palabras método map () para contar la frecuencia de la palabra y el método reduceByKey () para contar cada palabra repetición.
Utilice los siguientes pasos para presentar esta solicitud. Ejecutar todos los pasos en la chispa de la aplicación del directorio a través del terminal.

Paso 1: Descargar Spark Ja

Se requiere tarro núcleo Spark para la compilación, por lo tanto, descargar chispa core_2.10-1.3.0.jar desde el siguiente enlace tarro núcleo Spark y mover el archivo jar del directorio de descarga de chispas aplicacióndirectorio.

Paso 2: programa Compile

Compile el programa anterior con el comando a continuación. Este comando debe ejecutarse desde el directorio chispa aplicación. Aquí,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar es un tarro de apoyo Hadoop tomado de la biblioteca Spark.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Paso 3: Crear un JAR

Crear un archivo jar de la aplicación de la chispa con el siguiente comando.Aquí, recuento de palabras es el nombre del archivo para el archivo jar.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Paso 4: Presentar solicitud chispa

Presentar la solicitud de chispa con el siguiente comando -
spark-submit --class SparkWordCount --master local wordcount.jar
Si se ejecuta con éxito, entonces usted va a encontrar la salida dada a continuación. El OK alquiler en el siguiente resultado es para la identificación del usuario y que es la última línea del programa. Si lees cuidadosamente la siguiente salida, encontrará diferentes cosas, como -
  • servicio iniciado correctamente 'sparkDriver' en el puerto 42954
  • MemoryStore comenzó con capacidad de 267,3 MB
  • Iniciado SparkUI en http://192.168.1.217:4040
  • Añadido archivo JAR: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile en SparkPi.scala: 11) terminaron en 0,566 s
  • Detenido interfaz web Spark en http://192.168.1.217:4040
  • MemoryStore aclaró
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Paso 5: Comprobación de salida

Después de la ejecución exitosa del programa, se encuentra el directorio llamado archivosalida en el directorio chispa aplicación.
Los siguientes comandos se utilizan para la apertura y la comprobación de la lista de archivos en el directorio de archivo de salida.
$ cd outfile 
$ ls
Part-00000 part-00001 _SUCCESS
Los comandos para el control de la producción en parte-00000 archivos son -
$ cat part-00000 
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Los comandos para el control de la producción en archivo de pieza-00001 son -
$ cat part-00001 
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Ir a través de la siguiente sección para saber más sobre el comando 'chispas submit'.

Spark-presentar Sintaxis

spark-submit [options] <app jar | python file> [app arguments]

Opciones


Apache Spark - Programacion Core

Spark Core es la base de todo el proyecto. Proporciona distribuye despacho de tareas, programación y funciones básicas de E / S. Spark utiliza una estructura de datos fundamental especializada conocida como RDD (Conjuntos de datos distribuidos resilientes) que es una colección lógica de los datos particionados a través de máquinas. DDR se pueden crear de dos formas; uno está haciendo referencia a los conjuntos de datos en los sistemas de almacenamiento externo y la segunda es por las transformaciones que aplican (por ejemplo, mapa, filtro, reductor, se unen) en DDR existentes.
La abstracción RDD se expone a través de una API de lenguaje integrado. Esto simplifica la complejidad de la programación debido a la forma en que las aplicaciones manipulan DDR es similar a la manipulación de colecciones locales de datos.

Spark Shell

Spark ofrece un shell interactivo - una poderosa herramienta para analizar los datos de forma interactiva. Está disponible en cualquier Scala o el lenguaje Python. Abstracción principal de Spark es un conjunto distribuido de elementos llamados un Resilient conjunto de datos distribuido (RDD). DDR se puede crear desde Formatos de entrada Hadoop (como archivos HDFS) o mediante la transformación de otros DDR.

Abrir Spark Shell

El siguiente comando se utiliza para abrir la cáscara Spark.
$ spark-shell

Crear sencilla RDD

Vamos a crear una sencilla RDD del archivo de texto. Utilice el siguiente comando para crear un simple RDD.
scala> val inputfile = sc.textFile(“input.txt”)
La salida del comando anterior es
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
La API RDD Spark introduce algunas transformaciones y pocas accionespara manipular RDD.

RDD Transformaciones

Transformaciones RDD devuelve puntero al nuevo RDD y le permite crear dependencias entre DDR. Cada RDD en cadena de la dependencia (Cadena de dependencias) tiene una función para el cálculo de sus datos y tiene un puntero (dependencia) a su matriz RDD.
Spark es perezoso, así que nada se ejecutará a menos que llame alguna transformación o acción que activará la creación de empleo y la ejecución. Mira el siguiente fragmento del ejemplo número de palabras.
Por lo tanto, la transformación RDD no es un conjunto de datos, pero es un paso en un programa (que podría ser el único paso) diciendo Spark cómo obtener los datos y qué hacer con él.
A continuación hay una lista de transformaciones RDD.
S.NoTransformaciones y Significado
1
Mapa (func)
Devuelve un nuevo conjunto de datos distribuida, formado haciendo pasar cada elemento de la fuente a través de una función func.
2
filtro (func)
Devuelve un nuevo conjunto de datos formado por la selección de los elementos de la fuente en la que func devuelve true.
3
flatMap (func)
Al igual que en el mapa, pero cada elemento de entrada se puede asignar a 0 o más elementos de salida (modo func debe devolver una Sec en lugar de un solo artículo).
4
mapPartitions (func)
Similar al mapa, pero corre por separado en cada partición (bloque) de la RDD, por lo func debe ser de tipo Iterator <T> ⇒ Iterator <U> cuando se ejecuta en un RDD de tipo T.
5
mapPartitionsWithIndex (func)
Al igual que en el mapa de particiones, sino que también proporcionafunc con un valor entero que representa el índice de la partición, así que func debe ser del tipo (Int, Iterator <T>) ⇒ Iterator <U> cuando se ejecuta en un RDD de tipo T.
6
muestra (withReplacement, fracción, semilla)
Pruebe una fracción de los datos, con o sin sustitución, utilizando una semilla generador de números aleatorios dado.
7
union (otherDataset)
Devuelve un nuevo conjunto de datos que contiene la unión de los elementos en el conjunto de datos de origen y el argumento.
8
intersección (otherDataset)
Devuelve un nuevo RDD que contiene la intersección de elementos en el conjunto de datos de origen y el argumento.
9
distinta ([numTasks])
Devuelve un nuevo conjunto de datos que contiene los distintos elementos del conjunto de datos de origen.
10
groupByKey ([numTasks])
Cuando se llama a un conjunto de datos de (K, V) pares, devuelve un conjunto de datos de (K, iterable <V>) pares.
Nota - Si está agrupando a fin de realizar una agregación (como una suma o promedio) sobre cada tecla, usando reduceByKey o aggregateByKey rendirá mucho mejor rendimiento.
11
reduceByKey (func, [numTasks])
Cuando se llama a un conjunto de datos de (K, V) pares, devuelve un conjunto de datos de (K, V) pares donde los valores para cada tecla se agregan utilizando el dado reducir la función func, que debe ser de tipo (V, V) ⇒ V . Al igual que en groupByKey, el número de reducir las tareas se puede configurar a través de un segundo argumento opcional.
12
aggregateByKey (zeroValue) (seqOp, combOp, [numTasks])
Cuando se llama a un conjunto de datos de (K, V) pares, devuelve un conjunto de datos de (K, T) pares donde los valores para cada tecla se agregan utilizando las funciones dadas combinar y neutral valor "cero". Permite un tipo de valor agregado que es diferente del tipo de valor de entrada, evitando al mismo tiempo las asignaciones innecesarias. Al igual que en groupByKey, el número de reducir las tareas se puede configurar a través de un segundo argumento opcional.
13
sortByKey ([ascendente], [numTasks])
Cuando se llama a un conjunto de datos de (K, V) pares donde implementos K pidió, devuelve un conjunto de datos de (K, V) pares ordenados por claves en orden ascendente o descendente, según se especifica en el argumento ascendente de Boole.
14
join (otherDataset, [numTasks])
Cuando se llama en conjuntos de datos de tipo (k, V) y (K, W), devuelve un conjunto de datos de pares con todos los pares de elementos para cada tecla (K, (V, W)). Combinaciones externas son apoyados a través leftOuterJoin, rightOuterJoin y fullOuterJoin.
15
cogroup (otherDataset, [numTasks])
Cuando pidió a los conjuntos de datos de tipo (K, V) y (K, W), devuelve un conjunto de datos de (K, (Iterable <V>, iterable <W>)) tuplas. Esta operación también se denomina grupo de Con.
16
cartesiano (otherDataset)
Cuando pidió a los conjuntos de datos de tipo T y U, devuelve un conjunto de datos de (T, U) pares (todos los pares de elementos).
17
tubería (mando, [envvars])
Pipe cada partición del RDD a través de un comando de shell, por ejemplo, un Perl o script bash. Elementos RDD se escriben en la entrada estándar y las líneas de salida del proceso a su salida estándar se devuelven como una RDD de cadenas.
18
coalescer (numPartitions)
Disminuir el número de particiones en el RDD a numPartitions. Útil para las operaciones en funcionamiento más eficiente después de filtrar por un gran conjunto de datos.
19
reparto (numPartitions)
Reordenar los datos de la RDD al azar para crear ya sea más o menos particiones y el equilibrio que a través de ellos. Esto siempre baraja todos los datos sobre la red.
20
repartitionAndSortWithinPartitions (particionador)
Reparticionar el RDD según el particionador dado y, dentro de cada partición resultante, ordenar los registros por parte de sus llaves. Esto es más eficiente que llamar reparto y luego la clasificación dentro de cada partición, ya que puede empujar hacia abajo en la clasificación de la maquinaria aleatoria.

Acciones

La siguiente tabla muestra una lista de acciones, que devuelven valores.
S.NoAcción y Significado
1
reducir (func)
Agregar los elementos del conjunto de datos utilizando una funciónfunc (que toma dos argumentos y devuelve uno). La función debe ser conmutativa y asociativa de modo que se puede calcular correctamente en paralelo.
2
recoger()
Devuelve todos los elementos del conjunto de datos como un conjunto en el programa piloto. Esto suele ser útil después de un filtro u otra operación que devuelve un suficientemente pequeño subconjunto de los datos.
3
count ()
Devuelve el número de elementos en el conjunto de datos.
4
primero()
Devuelve el primer elemento del conjunto de datos (similar a tomar (1)).
5
tomado)
Devuelve una matriz con los primeros n elementos del conjunto de datos.
6
takeSample (withReplacement, num, [semilla])
Devuelve una matriz con una muestra aleatoria de num elementos del conjunto de datos, con o sin reemplazo, opcionalmente pre-especificando una semilla generador de números aleatorios.
7
takeOrdered (n, [pedido])
Devuelve los primeros n elementos de la RDD utilizando cualquiera de su orden natural o un comparador personalizado.
8
saveAsTextFile (ruta)
Escribe los elementos del conjunto de datos como un archivo de texto (o un conjunto de archivos de texto) en un directorio dado en el sistema de archivos local, HDFS o cualquier otro sistema de archivo compatible-Hadoop. Spark llama a toString sobre cada elemento para convertirlo en una línea de texto en el archivo.
9
saveAsSequenceFile (ruta) (Java y Scala)
Escribe los elementos del conjunto de datos como Hadoop SequenceFile en una ruta dada en el sistema de archivos local, HDFS o cualquier otro sistema de archivo compatible-Hadoop. Esto está disponible en DDR de pares clave-valor que implementan la interfaz grabable de Hadoop. En Scala, que también está disponible en los tipos que son implícitamente convertible en grabables (Spark incluye conversiones de tipos básicos como int, double, String, etc).
10
saveAsObjectFile (ruta) (Java y Scala)
Escribe los elementos del conjunto de datos en un formato simple usando serialización Java, que luego puede ser cargado usando SparkContext.objectFile ().
11
countByKey ()
Sólo disponible en DDR de tipo (K, V). Devuelve un HashMap de pares con el recuento de cada tecla (K, Int).
12
foreach (func)
Ejecuta una función func a cada elemento del conjunto de datos. Esto es por lo general, hecho por efectos secundarios tales como la actualización de un acumulador o interactuar con los sistemas de almacenamiento externos.
Nota - la modificación de variables distintas de Acumuladores fuera del foreach () puede dar lugar a un comportamiento indefinido.Consulte Descripción de cierres para más detalles.

Programación con RDD

Veamos las implementaciones de algunas transformaciones y acciones RDD en la programación RDD con la ayuda de un ejemplo.

Ejemplo

Consideremos un ejemplo recuento de palabras - Se cuenta cada palabra que aparece en un documento. Considere el siguiente texto como una entrada y se guarda como un input.txt archivo en un directorio principal.
input.txt - archivo de entrada.
people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Siga el procedimiento descrito a continuación para ejecutar el ejemplo dado.

Abrir Spark-Shell

El siguiente comando se utiliza para abrir la cáscara chispa. En general, la chispa se construye utilizando Scala. Por lo tanto, un programa Spark se ejecuta en entorno Scala.
$ spark-shell
Si la cáscara Spark abre correctamente entonces usted encontrará la siguiente salida. Mira la última línea del "contexto Spark disponible como sc" medios de salida del contenedor de chispa se crea automáticamente objeto de contexto de la chispa con el nombre sc. Antes de iniciar el primer paso de un programa, se debe crear el objeto SparkContext.
Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Crear una RDD

En primer lugar, tenemos que leer el archivo de entrada usando API Spark-Scala y crear un RDD.
El siguiente comando se utiliza para leer un archivo de un lugar determinado.Aquí, nuevo RDD se crea con el nombre del archivo de entrada. La cadena que se da como argumento en el archivo de texto ("") método es ruta absoluta para el nombre de archivo de entrada. Sin embargo, si se le da el nombre de archivo, entonces significa que el archivo de entrada está en la ubicación actual.
scala> val inputfile = sc.textFile("input.txt")

Ejecutar Palabra Transformación recuento

Nuestro objetivo es contar las palabras en un archivo. Crear un mapa plano para dividir cada línea en palabras (flatMap (línea ⇒ linea.split ("")).
A continuación, lea cada palabra como una clave con un valor "1" (<clave, valor> = <palabra, 1>) utilizando la función de mapa (mapa (palabra ⇒ (palabra, 1)).
Por último, reducir esas llaves añadiendo valores de claves similares(reduceByKey (_ + _)).
El siguiente comando se utiliza para la ejecución de la lógica recuento de palabras. Después de ejecutar esta, no vas a encontrar ninguna salida porque esto no es una acción, esto es una transformación; señalando un nuevo RDD o decirle chispa para qué hacer con los datos dados)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

RDD actual

Mientras trabajaba con la RDD, si usted quiere saber sobre la corriente RDD, a continuación, utilice el siguiente comando. Se le mostrará la descripcion de RDD actual y sus dependencias para la depuración.
scala> counts.toDebugString

Almacenamiento en caché de las Transformaciones

Puede marcar un RDD se persistió con el persistir () o caché () métodos en él.La primera vez que se calcula en una acción, que se mantendrá en la memoria en los nodos. Utilice el siguiente comando para almacenar las transformaciones intermedias en la memoria.
scala> counts.cache()

La aplicación de la Acción

La aplicación de una acción, como la tienda de todas las transformaciones, los resultados en un archivo de texto. El argumento de cadena para saveAsTextFile ("") método es la ruta absoluta de la carpeta de salida. Pruebe el siguiente comando para guardar la salida en un archivo de texto. En el siguiente ejemplo, la carpeta de 'salida' se encuentra en la ubicación actual.
scala> counts.saveAsTextFile("output")

Comprobación de la salida

Abra otro terminal para ir al directorio principal (donde la chispa se ejecuta en el otro terminal). Utilice los siguientes comandos para comprobar el directorio de salida.
[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1

part-00000
part-00001
_SUCCESS
El siguiente comando se utiliza para ver la salida de la Parte-00000 archivos.
[hadoop@localhost output]$ cat part-00000

Producción

(people,1) 
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
El siguiente comando se utiliza para ver la salida de la Parte-00001 archivos.
[hadoop@localhost output]$ cat part-00001 

Producción

(walk, 1) 
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

ONU Persistir el almacenamiento

Antes de ONU-persistente, si quieres ver el espacio de almacenamiento que se utiliza para esta aplicación, a continuación, utilizar la siguiente URL en su navegador.
http://localhost:4040
Verá la siguiente pantalla, que muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell Spark.
espacio de almacenamiento
Si desea ONU-persistir el espacio de almacenamiento de especial RDD, a continuación, utilice el siguiente comando.
Scala> counts.unpersist()
Verá la salida de la siguiente manera -
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Para verificar el espacio de almacenamiento en el navegador, utilice la siguiente URL.
http://localhost:4040/
Verá la siguiente pantalla. Muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell Spark.