Tag Archives: Open Source

Primitive way with Folium

So I discovered Folium about two months ago and decided to map the primitive way with it. Coordinates data is retrieved from Strava gpx files and cleaned up leaving only latitude and longitude as below.

user@computer:$ head Camin_prim_stage1.csv
lat,lon
43.3111770,-5.6941620
43.3113360,-5.6943420
43.3114370,-5.6944600
43.3115000,-5.6945420
43.3116970,-5.6948090
43.3119110,-5.6950900
43.3122360,-5.6956830
43.3123220,-5.6958090
43.3126840,-5.6963740

Below is the python file we will use to retrieve data and create the map with the routes.

import folium
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.master("local").getOrCreate()

# Change Spark loglevel
spark.sparkContext.setLogLevel('FATAL')

# Load the rides and ride_routes data from local instead of HDFS
position1 = spark.read.load("/home/user/Camin_prim_stage1.csv", format="csv", sep=",", inferSchema="true", header="true")
position2 = spark.read.load("/home/user/Camin_prim_stage2.csv", format="csv", sep=",", inferSchema="true", header="true")
position3 = spark.read.load("/home/user/Camin_prim_stage3.csv", format="csv", sep=",", inferSchema="true", header="true")

position = [position1, position2, position3]

m = folium.Map()
col=0
colArray=['red','blue','green']

# Check file was correctly loaded
for x in position:
# x.printSchema()
# x.show(2)

# Map position
coordinates = [[float(i.lat), float(i.lon)] for i in x.collect()]

# Make a Folium map
#m = folium.Map()
m.fit_bounds(coordinates, padding=(25, 25))
folium.PolyLine(locations=coordinates, weight=5, color=colArray[col]).add_to(m)
folium.Marker(coordinates[1], popup="Origin").add_to(m)
folium.Marker(coordinates[-1], popup="Destination").add_to(m)
col = col + 1
# Save to an html file
m.save('chamin_prim.html')

# Cleanup
spark.stop()

Continue reading

Fixing fail2ban

I had installed fail2ban but had noticed it wasn’t working blocking ssh brute force attacks. Attacks such as below.

user@computer:$ grep sshd /var/log/auth.log | tail
Apr 29 08:06:17 sd-229337 sshd[20646]: pam_unix(sshd:auth): check pass; user unknown
Apr 29 08:06:17 sd-229337 sshd[20646]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=211-75-3-35.hinet-ip.hinet.net
Apr 29 08:06:18 sd-229337 sshd[20646]: Failed password for invalid user db2inst from 211.75.3.35 port 52724 ssh2
Apr 29 08:06:19 sd-229337 sshd[20646]: Connection closed by 211.75.3.35 [preauth]
Apr 29 08:18:21 sd-229337 sshd[20711]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=59-120-243-8.hinet-ip.hinet.net user=root
Apr 29 08:18:25 sd-229337 sshd[20711]: Failed password for root from 59.120.243.8 port 34312 ssh2
Apr 29 08:18:25 sd-229337 sshd[20711]: Connection closed by 59.120.243.8 [preauth]
Apr 29 08:19:14 sd-229337 sshd[20713]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=195-154-136-62.rev.poneytelecom.eu user=root
Apr 29 08:19:16 sd-229337 sshd[20713]: Failed password for root from 195.154.136.62 port 24329 ssh2
Apr 29 08:19:16 sd-229337 sshd[20713]: Connection closed by 195.154.136.62 [preauth]

In order to fix this we need to modify /etc/fail2ban/filter.d/common.local and modify bsd_syslog_verbose entry. Substitute __bsd_syslog_verbose = (<[^.]+\.[^.]+>) for __bsd_syslog_verbose = (<[^.]+ [^.]+>).

user@computer:$ grep bsd_syslog_verbose /etc/fail2ban/filter.d/common.local
#__bsd_syslog_verbose = (<[^.]+\.[^.]+>)
__bsd_syslog_verbose = (<[^.]+ [^.]+>)
__prefix_line = \s*%(__bsd_syslog_verbose)s?\s*(?:%(__hostname)s )?(?:%(__kernel_prefix)s )?(?:@vserver_\S+ )?%(__daemon_combs_re)s?\s%(__daemon_extra_re)s?\s*

Restart fail2ban and you shall now see IPs performing brute force attacks being blocked as below.

user@computer:$ tail -30 /var/log/fail2ban.log | grep actions
2018-04-29 18:43:19,835 fail2ban.actions[28271]: WARNING [ssh] Unban 163.172.159.119
2018-04-29 18:43:20,742 fail2ban.actions[28519]: INFO Set banTime = 1800
2018-04-29 18:43:20,936 fail2ban.actions[28519]: INFO Set banTime = 600
2018-04-29 18:43:59,119 fail2ban.actions[28519]: WARNING [ssh] Ban 171.244.27.195
2018-04-29 18:46:05,286 fail2ban.actions[28519]: WARNING [ssh] Ban 5.188.10.185
2018-04-29 19:13:59,938 fail2ban.actions[28519]: WARNING [ssh] Unban 171.244.27.195
2018-04-29 19:14:50,026 fail2ban.actions[28519]: WARNING [ssh] Ban 171.244.27.195
2018-04-29 19:15:35,102 fail2ban.actions[28519]: WARNING [ssh] Ban 159.65.10.166
2018-04-29 19:16:06,167 fail2ban.actions[28519]: WARNING [ssh] Unban 5.188.10.185
2018-04-29 19:44:50,740 fail2ban.actions[28519]: WARNING [ssh] Unban 171.244.27.195
2018-04-29 19:45:35,821 fail2ban.actions[28519]: WARNING [ssh] Unban 159.65.10.166
2018-04-29 19:45:38,858 fail2ban.actions[28519]: WARNING [ssh] Ban 171.244.27.195

But why is this happening? It is because of regular expressions. The way logs are being written it will never find a match with the original __bsd_syslog_verbose. Below script test both bsd_syslog_verbose settings. Originally we needed to have a ., but in reality we have a space in our logs, so we need to modify bsd_syslog_verbose.

#!/usr/bin/env python

import re

testline = 'May 13 06:24:36'

match = re.search('[^.]+\.[^.]+', testline)
if match:
    print 'Found:', match.group()
else:
    print 'Not found for bsd_syslog_verbose=[^.]+\.[^.]+'

match = re.search('[^.]+ [^.]+', testline)
if match:
    print 'Found:', match.group()
else:
    print 'Not found for bsd_syslog_verbose=[^.]+ [^.]+'

And we execute:

user@computer:$ python regex.py
Not found for bsd_syslog_verbose=[^.]+\.[^.]+
Found: May 13 06:24:36

More info here and some instructive regex google doc.

Apache Flume to write web server logs to Hadoop

In this post we will use flume to dump Apache webserver logs into HDFS. We already have a web server running and flume installed, but we need to configure a target and a source.

We use the following file as target.

## TARGET AGENT ##  
## configuration file location:  /etc/flume-ng/conf
## START Agent: flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent.conf -n collector

#http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn  
collector.sources.AvroIn.type = avro  
collector.sources.AvroIn.bind = 0.0.0.0  
collector.sources.AvroIn.port = 4545  
collector.sources.AvroIn.channels = mc1 mc2

## Channels ##
## Source writes to 2 channels, one for each sink
collector.channels = mc1 mc2

#http://flume.apache.org/FlumeUserGuide.html#memory-channel

collector.channels.mc1.type = memory  
collector.channels.mc1.capacity = 100

collector.channels.mc2.type = memory  
collector.channels.mc2.capacity = 100

## Sinks ##
collector.sinks = LocalOut HadoopOut

## Write copy to Local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
collector.sinks.LocalOut.type = file_roll  
collector.sinks.LocalOut.sink.directory = /var/log/flume-ng  
collector.sinks.LocalOut.sink.rollInterval = 0  
collector.sinks.LocalOut.channel = mc1

## Write to HDFS
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.HadoopOut.type = hdfs  
collector.sinks.HadoopOut.channel = mc2  
collector.sinks.HadoopOut.hdfs.path = /user/training/flume/events/%{log_type}/%y%m%d  
collector.sinks.HadoopOut.hdfs.fileType = DataStream  
collector.sinks.HadoopOut.hdfs.writeFormat = Text  
collector.sinks.HadoopOut.hdfs.rollSize = 0  
collector.sinks.HadoopOut.hdfs.rollCount = 10000  
collector.sinks.HadoopOut.hdfs.rollInterval = 600

Continue reading

Export data from HDFS to MySQL

First create the DB and table where you want to populate.

user@computer:$ echo "create database staff2; use staff2; CREATE TABLE editorial (id INT(100) unsigned not null AUTO_INCREMENT, name VARCHAR(20), email VARCHAR(20), primary key (id));" | mysql -u root -p

Once done, we have the data we want to copy in HDFS.

user@computer:$ hdfs dfs -cat /home/training/staff/editorial/part-m-*
1,Peter,peter@example.com
2,Jack,jack@example.com

Now dump into MySQL using sqoop.

user@computer:$ sqoop export --connect jdbc:mysql://localhost/staff2 --username root -P --table editorial --export-dir /home/training/staff/editorial
17/02/27 12:51:56 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0
Enter password:
17/02/27 12:51:58 INFO manager.SqlManager: Using default fetchSize of 1000
17/02/27 12:51:58 INFO tool.CodeGenTool: Beginning code generation
17/02/27 12:51:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `editorial` AS t LIMIT 1
17/02/27 12:51:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `editorial` AS t LIMIT 1
17/02/27 12:51:59 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce
Note: /tmp/sqoop-training/compile/e560499b42a9738bbc5ef127712adc7b/editorial.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
17/02/27 12:52:03 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/e560499b42a9738bbc5ef127712adc7b/editorial.jar
17/02/27 12:52:03 INFO mapreduce.ExportJobBase: Beginning export of editorial
17/02/27 12:52:06 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
17/02/27 12:52:08 INFO input.FileInputFormat: Total input paths to process : 2
17/02/27 12:52:08 INFO input.FileInputFormat: Total input paths to process : 2
17/02/27 12:52:09 INFO mapred.JobClient: Running job: job_201702221239_0006
17/02/27 12:52:10 INFO mapred.JobClient: map 0% reduce 0%
17/02/27 12:52:31 INFO mapred.JobClient: map 50% reduce 0%
17/02/27 12:52:45 INFO mapred.JobClient: map 100% reduce 0%
17/02/27 12:52:49 INFO mapred.JobClient: Job complete: job_201702221239_0006
17/02/27 12:52:49 INFO mapred.JobClient: Counters: 24
17/02/27 12:52:49 INFO mapred.JobClient: File System Counters
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of bytes read=0
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of bytes written=1176756
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of read operations=0
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of large read operations=0
17/02/27 12:52:49 INFO mapred.JobClient: FILE: Number of write operations=0
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of bytes read=759
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of bytes written=0
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of read operations=19
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of large read operations=0
17/02/27 12:52:49 INFO mapred.JobClient: HDFS: Number of write operations=0
17/02/27 12:52:49 INFO mapred.JobClient: Job Counters
17/02/27 12:52:49 INFO mapred.JobClient: Launched map tasks=4
17/02/27 12:52:49 INFO mapred.JobClient: Data-local map tasks=4
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=64216
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
17/02/27 12:52:49 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
17/02/27 12:52:49 INFO mapred.JobClient: Map-Reduce Framework
17/02/27 12:52:49 INFO mapred.JobClient: Map input records=2
17/02/27 12:52:49 INFO mapred.JobClient: Map output records=2
17/02/27 12:52:49 INFO mapred.JobClient: Input split bytes=661
17/02/27 12:52:49 INFO mapred.JobClient: Spilled Records=0
17/02/27 12:52:49 INFO mapred.JobClient: CPU time spent (ms)=3390
17/02/27 12:52:49 INFO mapred.JobClient: Physical memory (bytes) snapshot=422584320
17/02/27 12:52:49 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2940895232
17/02/27 12:52:49 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984
17/02/27 12:52:49 INFO mapreduce.ExportJobBase: Transferred 759 bytes in 42.9426 seconds (17.6748 bytes/sec)
17/02/27 12:52:49 INFO mapreduce.ExportJobBase: Exported 2 records.

Now we can see the content in MySQL DB named staff2.

user@computer:$ echo "use staff2; SELECT * FROM editorial;" | mysql -u root -p
Enter password:
id name email
1 Peter peter@example.com
2 Jack jack@example.com

Using sqoop to import a DB table into HDFS

In the world of Big Data to import data from a DB into HDFS you need Apache Sqoop.

user@computer:$ sqoop import --connect jdbc:mysql://localhost/mysql --username training -P --warehouse-dir /home/training/db --table user
17/02/23 10:38:19 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0
Enter password:
17/02/23 10:38:24 INFO manager.SqlManager: Using default fetchSize of 1000
17/02/23 10:38:24 INFO tool.CodeGenTool: Beginning code generation
17/02/23 10:38:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `user` AS t LIMIT 1
17/02/23 10:38:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `user` AS t LIMIT 1
17/02/23 10:38:24 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce
Note: /tmp/sqoop-training/compile/7f3a9709c50f58c2c6bb24de91922c6b/user.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
17/02/23 10:38:29 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/7f3a9709c50f58c2c6bb24de91922c6b/user.jar
17/02/23 10:38:29 WARN manager.MySQLManager: It looks like you are importing from mysql.
17/02/23 10:38:29 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
17/02/23 10:38:29 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
17/02/23 10:38:29 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
17/02/23 10:38:29 WARN manager.CatalogQueryManager: The table user contains a multi-column primary key. Sqoop will default to the column Host only for this job.
17/02/23 10:38:29 WARN manager.CatalogQueryManager: The table user contains a multi-column primary key. Sqoop will default to the column Host only for this job.
17/02/23 10:38:29 INFO mapreduce.ImportJobBase: Beginning import of user
17/02/23 10:38:31 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
17/02/23 10:38:33 INFO db.DBInputFormat: Using read commited transaction isolation
17/02/23 10:38:33 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`Host`), MAX(`Host`) FROM `user`
17/02/23 10:38:33 WARN db.TextSplitter: Generating splits for a textual index column.
17/02/23 10:38:33 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
17/02/23 10:38:33 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
17/02/23 10:38:33 INFO mapred.JobClient: Running job: job_201702221239_0003
17/02/23 10:38:34 INFO mapred.JobClient: map 0% reduce 0%
17/02/23 10:38:55 INFO mapred.JobClient: map 17% reduce 0%
17/02/23 10:38:56 INFO mapred.JobClient: map 33% reduce 0%
17/02/23 10:39:08 INFO mapred.JobClient: map 50% reduce 0%
17/02/23 10:39:09 INFO mapred.JobClient: map 67% reduce 0%
17/02/23 10:39:21 INFO mapred.JobClient: map 83% reduce 0%
17/02/23 10:39:22 INFO mapred.JobClient: map 100% reduce 0%
17/02/23 10:39:26 INFO mapred.JobClient: Job complete: job_201702221239_0003
17/02/23 10:39:26 INFO mapred.JobClient: Counters: 23
17/02/23 10:39:26 INFO mapred.JobClient: File System Counters
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of bytes read=0
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of bytes written=1778658
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of read operations=0
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of large read operations=0
17/02/23 10:39:26 INFO mapred.JobClient: FILE: Number of write operations=0
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of bytes read=791
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of bytes written=818
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of read operations=6
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of large read operations=0
17/02/23 10:39:26 INFO mapred.JobClient: HDFS: Number of write operations=6
17/02/23 10:39:26 INFO mapred.JobClient: Job Counters
17/02/23 10:39:26 INFO mapred.JobClient: Launched map tasks=6
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=89702
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
17/02/23 10:39:26 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
17/02/23 10:39:26 INFO mapred.JobClient: Map-Reduce Framework
17/02/23 10:39:26 INFO mapred.JobClient: Map input records=8
17/02/23 10:39:26 INFO mapred.JobClient: Map output records=8
17/02/23 10:39:26 INFO mapred.JobClient: Input split bytes=791
17/02/23 10:39:26 INFO mapred.JobClient: Spilled Records=0
17/02/23 10:39:26 INFO mapred.JobClient: CPU time spent (ms)=5490
17/02/23 10:39:26 INFO mapred.JobClient: Physical memory (bytes) snapshot=666267648
17/02/23 10:39:26 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4423995392
17/02/23 10:39:26 INFO mapred.JobClient: Total committed heap usage (bytes)=191102976
17/02/23 10:39:26 INFO mapreduce.ImportJobBase: Transferred 818 bytes in 56.7255 seconds (14.4203 bytes/sec)
17/02/23 10:39:26 INFO mapreduce.ImportJobBase: Retrieved 8 records.

Example above dumps table user from mysql DB into hadoop.
First connect to DB using –connect
–username would by the authentication username, -P to ask for password at prompt. –warehouse-dir HDFS parent for table destination and –table to select the table to import.

Below dumped content is shown.

user@computer:$ hdfs dfs -cat /home/training/db/user/part-m-0000*
127.0.0.1,root,,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost,root,,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost.localdomain,root,,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost,,,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0
localhost.localdomain,,,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0
localhost,training,*27CF0BD18BDADD517165824F8C1FFF667B47D04B,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,N,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,Y,,,,,0,0,0,0
localhost,hiveuser,*2470C0C06DEE42FD1618BB99005ADCA2EC9D1E19,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0
localhost,hue,*15221DE9A04689C4D312DEAC3B87DDF542AF439E,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,,,,,0,0,0,0