Tag Archives: Open Source

Apache Flume to write web server logs to Hadoop

In this post we will use flume to dump Apache webserver logs into HDFS. We already have a web server running and flume installed, but we need to configure a target and a source.

We use the following file as target.

## TARGET AGENT ##  
## configuration file location:  /etc/flume-ng/conf
## START Agent: flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent.conf -n collector

#http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn  
collector.sources.AvroIn.type = avro  
collector.sources.AvroIn.bind = 0.0.0.0  
collector.sources.AvroIn.port = 4545  
collector.sources.AvroIn.channels = mc1 mc2

## Channels ##
## Source writes to 2 channels, one for each sink
collector.channels = mc1 mc2

#http://flume.apache.org/FlumeUserGuide.html#memory-channel

collector.channels.mc1.type = memory  
collector.channels.mc1.capacity = 100

collector.channels.mc2.type = memory  
collector.channels.mc2.capacity = 100

## Sinks ##
collector.sinks = LocalOut HadoopOut

## Write copy to Local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
collector.sinks.LocalOut.type = file_roll  
collector.sinks.LocalOut.sink.directory = /var/log/flume-ng  
collector.sinks.LocalOut.sink.rollInterval = 0  
collector.sinks.LocalOut.channel = mc1

## Write to HDFS
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.HadoopOut.type = hdfs  
collector.sinks.HadoopOut.channel = mc2  
collector.sinks.HadoopOut.hdfs.path = /user/training/flume/events/%{log_type}/%y%m%d  
collector.sinks.HadoopOut.hdfs.fileType = DataStream  
collector.sinks.HadoopOut.hdfs.writeFormat = Text  
collector.sinks.HadoopOut.hdfs.rollSize = 0  
collector.sinks.HadoopOut.hdfs.rollCount = 10000  
collector.sinks.HadoopOut.hdfs.rollInterval = 600

Continue reading

Export data from HDFS to MySQL

First create the DB and table where you want to populate.

user@computer:$ echo "create database staff2; use staff2; CREATE TABLE editorial (id INT(100) unsigned not null AUTO_INCREMENT, name VARCHAR(20), email VARCHAR(20), primary key (id));" | mysql -u root -p

Once done, we have the data we want to copy in HDFS.

user@computer:$ hdfs dfs -cat /home/training/staff/editorial/part-m-*
1,Peter,peter@example.com
2,Jack,jack@example.com

Now dump into MySQL using sqoop.

user@computer:$ sqoop export --connect jdbc:mysql://localhost/staff2 --username root -P --table editorial --export-dir /home/training/staff/editorial
17/02/27 12:51:56 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0
Enter password:
17/02/27 12:51:58 INFO manager.SqlManager: Using default fetchSize of 1000
17/02/27 12:51:58 INFO tool.CodeGenTool: Beginning code generation
17/02/27 12:51:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `editorial` AS t LIMIT 1
17/02/27 12:51:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `editorial` AS t LIMIT 1
17/02/27 12:51:59 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce
Note: /tmp/sqoop-training/compile/e560499b42a9738bbc5ef127712adc7b/editorial.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
17/02/27 12:52:03 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/e560499b42a9738bbc5ef127712adc7b/editorial.jar
17/02/27 12:52:03 INFO mapreduce.ExportJobBase: Beginning export of editorial
17/02/27 12:52:06 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
17/02/27 12:52:08 INFO input.FileInputFormat: Total input paths to process : 2
17/02/27 12:52:08 INFO input.FileInputFormat: Total input paths to process : 2
17/02/27 12:52:09 INFO mapred.JobClient: Running job: job_201702221239_0006
17/02/27 12:52:10 INFO mapred.JobClient: map 0% reduce 0%
17/02/27 12:52:31 INFO mapred.JobClient: map 50% reduce 0%
17/02/27 12:52:45 INFO mapred.JobClient: map 100% reduce 0%
17/02/27 12:52:49 INFO mapred.JobClient: Job complete: job_201702221239_0006
17/02/27 12:52:49 INFO mapred.JobClient: Counters: 24
17/02/27 12:52:49 INFO mapred.JobClient: File System Counters
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of bytes read=0
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of bytes written=1176756
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of read operations=0
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of large read operations=0
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of write operations=0
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of bytes read=759
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of bytes written=0
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of read operations=19
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of large read operations=0
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of write operations=0
17/02/27 12:52:49 INFO mapred.JobClient: Job Counters
17/02/27 12:52:49 INFO mapred.JobClient: Launched map tasks=4
17/02/27 12:52:49 INFO mapred.JobClient: Data-local map tasks=4
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=64216
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
17/02/27 12:52:49 INFO mapred.JobClient: Map-Reduce Framework
17/02/27 12:52:49 INFO mapred.JobClient: Map input records=2
17/02/27 12:52:49 INFO mapred.JobClient: Map output records=2
17/02/27 12:52:49 INFO mapred.JobClient: Input split bytes=661
17/02/27 12:52:49 INFO mapred.JobClient: Spilled Records=0
17/02/27 12:52:49 INFO mapred.JobClient: CPU time spent (ms)=3390
17/02/27 12:52:49 INFO mapred.JobClient: Physical memory (bytes) snapshot=422584320
17/02/27 12:52:49 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2940895232
17/02/27 12:52:49 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984
17/02/27 12:52:49 INFO mapreduce.ExportJobBase: Transferred 759 bytes in 42.9426 seconds (17.6748 bytes/sec)
17/02/27 12:52:49 INFO mapreduce.ExportJobBase: Exported 2 records.

Now we can see the content in MySQL DB named staff2.

user@computer:$ echo "use staff2; SELECT * FROM editorial;" | mysql -u root -p
Enter password:
id name email
1 Peter peter@example.com
2 Jack jack@example.com

Using sqoop to import a DB table into HDFS

In the world of Big Data to import data from a DB into HDFS you need Apache Sqoop.

user@computer:$ sqoop import --connect jdbc:mysql://localhost/mysql --username training -P --warehouse-dir /home/training/db --table user
17/02/23 10:38:19 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0
Enter password:
17/02/23 10:38:24 INFO manager.SqlManager: Using default fetchSize of 1000
17/02/23 10:38:24 INFO tool.CodeGenTool: Beginning code generation
17/02/23 10:38:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `user` AS t LIMIT 1
17/02/23 10:38:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `user` AS t LIMIT 1
17/02/23 10:38:24 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce
Note: /tmp/sqoop-training/compile/7f3a9709c50f58c2c6bb24de91922c6b/user.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
17/02/23 10:38:29 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/7f3a9709c50f58c2c6bb24de91922c6b/user.jar
17/02/23 10:38:29 WARN manager.MySQLManager: It looks like you are importing from mysql.
17/02/23 10:38:29 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
17/02/23 10:38:29 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
17/02/23 10:38:29 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
17/02/23 10:38:29 WARN manager.CatalogQueryManager: The table user contains a multi-column primary key. Sqoop will default to the column Host only for this job.
17/02/23 10:38:29 WARN manager.CatalogQueryManager: The table user contains a multi-column primary key. Sqoop will default to the column Host only for this job.
17/02/23 10:38:29 INFO mapreduce.ImportJobBase: Beginning import of user
17/02/23 10:38:31 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
17/02/23 10:38:33 INFO db.DBInputFormat: Using read commited transaction isolation
17/02/23 10:38:33 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`Host`), MAX(`Host`) FROM `user`
17/02/23 10:38:33 WARN db.TextSplitter: Generating splits for a textual index column.
17/02/23 10:38:33 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
17/02/23 10:38:33 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
17/02/23 10:38:33 INFO mapred.JobClient: Running job: job_201702221239_0003
17/02/23 10:38:34 INFO mapred.JobClient: map 0% reduce 0%
17/02/23 10:38:55 INFO mapred.JobClient: map 17% reduce 0%
17/02/23 10:38:56 INFO mapred.JobClient: map 33% reduce 0%
17/02/23 10:39:08 INFO mapred.JobClient: map 50% reduce 0%
17/02/23 10:39:09 INFO mapred.JobClient: map 67% reduce 0%
17/02/23 10:39:21 INFO mapred.JobClient: map 83% reduce 0%
17/02/23 10:39:22 INFO mapred.JobClient: map 100% reduce 0%
17/02/23 10:39:26 INFO mapred.JobClient: Job complete: job_201702221239_0003
17/02/23 10:39:26 INFO mapred.JobClient: Counters: 23
17/02/23 10:39:26 INFO mapred.JobClient: File System Counters
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of bytes read=0
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of bytes written=1778658
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of read operations=0
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of large read operations=0
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of write operations=0
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of bytes read=791
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of bytes written=818
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of read operations=6
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of large read operations=0
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of write operations=6
17/02/23 10:39:26 INFO mapred.JobClient: Job Counters
17/02/23 10:39:26 INFO mapred.JobClient: Launched map tasks=6
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=89702
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
17/02/23 10:39:26 INFO mapred.JobClient: Map-Reduce Framework
17/02/23 10:39:26 INFO mapred.JobClient: Map input records=8
17/02/23 10:39:26 INFO mapred.JobClient: Map output records=8
17/02/23 10:39:26 INFO mapred.JobClient: Input split bytes=791
17/02/23 10:39:26 INFO mapred.JobClient: Spilled Records=0
17/02/23 10:39:26 INFO mapred.JobClient: CPU time spent (ms)=5490
17/02/23 10:39:26 INFO mapred.JobClient: Physical memory (bytes) snapshot=666267648
17/02/23 10:39:26 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4423995392
17/02/23 10:39:26 INFO mapred.JobClient: Total committed heap usage (bytes)=191102976
17/02/23 10:39:26 INFO mapreduce.ImportJobBase: Transferred 818 bytes in 56.7255 seconds (14.4203 bytes/sec)
17/02/23 10:39:26 INFO mapreduce.ImportJobBase: Retrieved 8 records.

Example above dumps table user from mysql DB into hadoop.
First connect to DB using –connect
–username would by the authentication username, -P to ask for password at prompt. –warehouse-dir HDFS parent for table destination and –table to select the table to import.

Below dumped content is shown.

user@computer:$ hdfs dfs -cat /home/training/db/user/part-m-0000*
127.0.0.1,root,,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost,root,,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost.localdomain,root,,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost,,,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0
localhost.localdomain,,,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0
localhost,training,*27CF0BD18BDADD517165824F8C1FFF667B47D04B,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,N,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost,hiveuser,*2470C0C06DEE42FD1618BB99005ADCA2EC9D1E19,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0
localhost,hue,*15221DE9A04689C4D312DEAC3B87DDF542AF439E,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0

Python script to download all NASDAQ and NYSE ticker symbols

Below is a short python script to get all NASDAQ and NYSE common stock tickers. You can then use the resulting file to get a lot of info using yahoofinance library.

#!/usr/bin/env python

import ftplib
import os
import re

# Connect to ftp.nasdaqtrader.com
ftp = ftplib.FTP('ftp.nasdaqtrader.com', 'anonymous', 'anonymous@debian.org')

# Download files nasdaqlisted.txt and otherlisted.txt from ftp.nasdaqtrader.com
for ficheiro in ["nasdaqlisted.txt", "otherlisted.txt"]:
        ftp.cwd("/SymbolDirectory")
        localfile = open(ficheiro, 'wb')
        ftp.retrbinary('RETR ' + ficheiro, localfile.write)
        localfile.close()
ftp.quit()

# Grep for common stock in nasdaqlisted.txt and otherlisted.txt
for ficheiro in ["nasdaqlisted.txt", "otherlisted.txt"]:
        localfile = open(ficheiro, 'r')
        for line in localfile:
                if re.search("Common Stock", line):
                        ticker = line.split("|")[0]
                        # Append tickers to file tickers.txt
                        open("tickers.txt","a+").write(ticker + "\n")