DNS records are crucial for translating human-friendly domain names, like medium.com, into IP addresses that computers use to identify each other on the network. This translation is fundamental for accessing websites, sending emails, and other internet functionalities. WHOIS information provides details about the domain registration, including the registrant's contact information, domain status, and key dates.
Project Structure
DNS-Lookup/
├── app/
│ ├── core/
│ │ └── _config.py
│ │ └── settings.cfg
│ ├── routes/
│ │ └── _dnsLookup.py
│ ├── schemas/
│ │ ├── _dnsRequest.py
│ │ └── _whoisInfo.py
│ └── utils/
│ └── _dns.py
├── main.py
├── requirements.txt
├── static/
│ └── ... (static files)
└── templates/
└── index.html
Installation
Clone the repository:
git clone https://github.com/riottecboi/DNS-Lookup.git
This project running on Python 3.11, so please be sure that you are install Python 3.11 on your machine.
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update
sudo apt install python3.11 -y
sudo apt-get install python3-pip -y
Install virtual environment library if it’s not on your machine and initialize your virtual environment in your project and activate the virtual environment.
python3.11 -m pip install virtualenv
python3.11 -m venv <virtual-environment-name>
source <virtual-environment-name>/bin/activate
Install all required libraries for this project from the file requirements.txt.
python3.11 -m pip install -r requirements.txt
Usage
- Start the FastAPI server:
uvicorn main:app --reload
- Open your web browser and navigate to
http://localhost:8000
- Enter a domain name in the provided input field.
- View the DNS records and WHOIS information displayed on the web page.
Dockerize
We can also run application as container as well. Please build an image from Dockerfile in the project.
docker build -f docker/Dockerfile -t dns-lookup .
Then, running a container with the above image by
docker run -n DNS-Lookup -p 8000:8000 dns-lookup -d
Explanation
This code defines two Pydantic models:
_dnsRequest.py
from pydantic import BaseModel, model_validator
import validators
import socket
class DomainOrIP(BaseModel):
domain_or_ip: str
@model_validator(mode='after')
def validate_domain_or_ip(cls, value):
try:
# Check if input is IP address
socket.inet_pton(socket.AF_INET, value.domain_or_ip)
return value
except socket.error:
pass
try:
# Check if input is IPv6 address
socket.inet_pton(socket.AF_INET6, value.domain_or_ip)
return value
except socket.error:
pass
try:
# Check if input is domain name
validators.domain(value.domain_or_ip)
return value
except socket.error:
pass
raise ValueError(f"Invalid Domain or IP.")
class ErrorResponse(BaseModel):
error: str
message: str
DomainOrIP:
This model has a single field called domain_or_ip
of type str
.
It has a model_validator
decorator that applies the validate_domain_or_ip
function after the model has been initialized.
The validate_domain_or_ip
function performs the following validations:
- First, it tries to validate if the input
domain_or_ip
is a valid IPv4 address using thesocket.inet_pton
function withsocket.AF_INET
. - If the input is not a valid IPv4 address, it tries to validate if it's a valid IPv6 address using
socket.inet_pton
withsocket.AF_INET6
. - If the input is neither an IPv4 nor an IPv6 address, it tries to validate if it's a valid domain name using the
validators.domain
function from thevalidators
library. - If the input fails all three validations, it raises a
ValueError
with the message "Invalid Domain or IP." - If the input passes any of the validations, the function returns the original value without any modifications.
ErrorResponse:
- This model has two fields:
error
(str) andmessage
(str). - It is likely used to represent an error response payload when an error occurs in the application.
The purpose of this code is to provide a way to validate user input and ensure that it is either a valid IP address (IPv4 or IPv6) or a valid domain name. This validation is important because the application likely needs to handle both IP addresses and domain names correctly.
The ErrorResponse model is used to create a structured error response that can be returned to the client when an error occurs, such as when the input is invalid.
_dns.py
import asyncio
import validators
import whois
import folium
from geopy import Nominatim
from schemas._whoisInfo import WhoisInfo
class DomainLocator:
def __init__(self, domain: str):
self.geolocator = Nominatim(user_agent="DNS_lookup")
self.domain = domain
self.domain_info = None
self.return_info = None
self.domain_map = None
async def fetch_domain_info(self):
try:
loop = asyncio.get_event_loop()
self.domain_info = await loop.run_in_executor(None, whois.whois, self.domain)
except Exception as e:
print(f"Error fetching WHOIS information for {self.domain}: {e}")
async def get_coordinates(self, location):
try:
location = self.geolocator.geocode(location)
if location:
return location.latitude, location.longitude
else:
return None, None
except Exception as e:
print(f"Error fetching coordinates for {location}: {e}")
return None, None
async def plot_domain_location(self):
if self.domain_info and self.domain_info.registrar:
location = self.domain_info.address
if self.domain_info.country and isinstance(self.domain_info.country, str):
location = self.domain_info.country
if self.domain_info.city and isinstance(self.domain_info.city, str):
location = self.domain_info.city
lat, lon = await self.get_coordinates(location)
if lat and lon:
map = folium.Map(location=[lat, lon], zoom_start=4)
folium.Marker([lat, lon], popup=f"{self.domain}").add_to(map)
self.domain_map = map.get_root()._repr_html_()
else:
print(f"Unable to find coordinates for location: {location}")
else:
print(f"No registrar information found for {self.domain}")
self.domain_map = ''
async def map_whois_data(self, data):
if self.domain_info.domain_name and "Name or service not known" not in self.domain_info.text:
whois_fields = list(WhoisInfo.model_fields.keys())
self.return_info = {}
for field in whois_fields:
if field in data:
self.return_info[field] = data[field]
return self.return_info
else:
return {}
async def process_domain(self):
if self.domain:
print(f"Processing domain: {self.domain}")
await self.fetch_domain_info()
await self.map_whois_data(self.domain_info)
await self.plot_domain_location()
return self.return_info, self.domain_map
else:
print("No valid domain to process")
return None, None
This code defines a DomainLocator
class that is responsible for fetching WHOIS information for a given domain, mapping the WHOIS data to a structured model, and plotting the location of the domain on a map using the folium
library.
-
__init__(self, domain: str)
:- Initializes the
DomainLocator
instance with a domain name. - Creates a
Nominatim
instance from thegeopy
library for geocoding purposes. - Sets the
domain_info
,return_info
, anddomain_map
attributes toNone
initially.
- Initializes the
-
async fetch_domain_info(self)
:- Fetches the WHOIS information for the given domain using the
whois
library. - Runs the
whois.whois
function in an executor to avoid blocking the event loop. - Stores the WHOIS information in the
domain_info
attribute.
- Fetches the WHOIS information for the given domain using the
-
async get_coordinates(self, location)
:- Takes a location string as input and uses the
Nominatim
geocoder to retrieve the latitude and longitude coordinates. - Returns the coordinates as a tuple
(latitude, longitude)
or(None, None)
if the location could not be geocoded.
- Takes a location string as input and uses the
-
async plot_domain_location(self)
:- Attempts to determine the location of the domain based on the available WHOIS information (address, country, or city).
- Calls the
get_coordinates
method to obtain the latitude and longitude coordinates for the location. - If coordinates are found, creates a
folium
map centered on those coordinates and adds a marker for the domain. - Stores the HTML representation of the map in the
domain_map
attribute.
-
async map_whois_data(self, data)
:- Maps the WHOIS data to the
WhoisInfo
model defined in theschemas._whoisInfo
module. - Iterates over the fields in the
WhoisInfo
model and populates thereturn_info
dictionary with the corresponding values from the WHOIS data. - Returns the
return_info
dictionary if the WHOIS data is valid, or an empty dictionary if the data is not available or invalid.
- Maps the WHOIS data to the
-
async process_domain(self)
:- The main method that orchestrates the entire process of fetching WHOIS information, mapping the data, and plotting the domain location.
- Calls the
fetch_domain_info
,map_whois_data
, andplot_domain_location
methods in sequence. - Returns the
return_info
dictionary and thedomain_map
HTML content.
This class is designed to be used asynchronously, as indicated by the async keyword on the methods. It leverages the asyncio library and the run_in_executor function to offload blocking operations (like fetching WHOIS data) to a separate executor, allowing the event loop to remain responsive.
The process_main method can be called with a valid domain name, and it will return the structured WHOIS information and a map showing the location of the domain (if available).
_dnsLookup.py
import json
import os
from fastapi import APIRouter, HTTPException, Response, Request, Form, status, Depends
from fastapi.responses import RedirectResponse
from utils._dns import DomainLocator
from schemas._dnsRequest import DomainOrIP, ErrorResponse
from pydantic_core._pydantic_core import ValidationError
from cryptography.fernet import Fernet
from fastapi.templating import Jinja2Templates
key = Fernet.generate_key()
cipher_suite = Fernet(key)
folder = os.getcwd()
os.chdir("..")
path = os.getcwd()
templates = Jinja2Templates(directory=path+"/templates")
router = APIRouter()
def serialize_datetime(dt_or_list):
if isinstance(dt_or_list, list):
return [dt.isoformat() for dt in dt_or_list]
if dt_or_list is None:
return ''
else:
return dt_or_list.isoformat()
async def process_and_encrypt_data(domain_or_ip: str):
try:
validated_input = DomainOrIP(domain_or_ip=domain_or_ip)
domain_or_ip = validated_input.domain_or_ip
locator = DomainLocator(domain_or_ip)
domain_info, domain_map = await locator.process_domain()
if domain_info:
domain_info['updated_date'] = serialize_datetime(domain_info['updated_date'])
domain_info['creation_date'] = serialize_datetime(domain_info['creation_date'])
domain_info['expiration_date'] = serialize_datetime(domain_info['expiration_date'])
encrypted_domain_info = cipher_suite.encrypt(json.dumps(domain_info).encode()).decode()
encrypted_domain_map = cipher_suite.encrypt(domain_map.encode()).decode()
return encrypted_domain_info, encrypted_domain_map
else:
return None, None
except ValidationError as e:
raise HTTPException(status_code=400, detail={"error": "Not processing Domain/IP",
"message": "The input cannot process. Please try again."})
@router.post("/lookup", responses={400: {"model": ErrorResponse}})
async def dns_lookup(domain_or_ip: str = Form(...)):
try:
encrypted_domain_info, encrypted_domain_map = await process_and_encrypt_data(domain_or_ip)
return RedirectResponse(url=f"/result?domain={domain_or_ip}&domain_info={encrypted_domain_info}&domain_map={encrypted_domain_map}", status_code=302)
except ValidationError as e:
raise HTTPException(status_code=400, detail=ErrorResponse(error="Not processing Domain/IP",
message="The input cannot process. Please try again.").dict())
@router.get("/home")
async def get_template(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "domain_info": None, "domain_map": None, "found": True})
@router.get("/result")
async def get_template(request: Request):
found = True
search_domain = request.query_params.get('domain')
domain_info = request.query_params.get('domain_info')
domain_map = request.query_params.get('domain_map')
if domain_info == 'None':
domain_info = eval(domain_info)
domain_map = eval(domain_map)
found = False
else:
decrypted_domain_info_json = cipher_suite.decrypt(domain_info.encode()).decode() if domain_info else None
domain_info = json.loads(decrypted_domain_info_json)
domain_map = cipher_suite.decrypt(domain_map.encode()).decode() if domain_map else None
return templates.TemplateResponse("index.html", {"request": request, "domain": search_domain, "domain_info": domain_info, "domain_map": domain_map, "found": found})
Imports: The code imports the necessary modules and classes, such as
json
for working with JSON data,os
for interacting with the operating system, and various classes from the FastAPI framework.Initialization: The code generates a key using the
Fernet
class from thecryptography
module, which is likely used for encryption and decryption purposes. It also sets up a Jinja2 template environment for rendering HTML templates and creates an instance of theAPIRouter
class from FastAPI.-
Helper Functions:
-
serialize_datetime
: This function converts datetime objects or a list of datetime objects to ISO format strings. -
process_and_encrypt_data
: This async function takes a domain or IP address as input, validates it using theDomainOrIP
schema, retrieves domain information using theDomainLocator
class, and encrypts the domain information and domain map using theFernet
cipher suite.
-
-
API Routes:
-
@router.post("/lookup")
: This route accepts a domain or IP address as form data, calls theprocess_and_encrypt_data
function, and redirects the user to the/result
route with the encrypted domain information and domain map as query parameters. -
@router.get("/home")
: This route renders theindex.html
template without any domain information. -
@router.get("/result")
: This route retrieves the domain, encrypted domain information, and encrypted domain map from the query parameters. If the domain information isNone
, it sets afound
flag toFalse
. Otherwise, it decrypts the domain information and domain map using theFernet
cipher suite and renders theindex.html
template with the retrieved data.
-
The code is a part of a larger application that retrieves and processes DNS information for a given domain or IP address.
It uses encryption and decryption techniques to securely transmit and store the domain information and map. The application provides a web interface ( through the index.html template) where users can input a domain or IP address, and the application retrieves and displays the corresponding DNS information.
main.py
from fastapi import FastAPI
from routes import _dnsLookup
from fastapi.staticfiles import StaticFiles
import os
app = FastAPI(
title="DNS Lookup API",
description="API for getting whois information and location of domain or IP",
version="1.0",
docs_url="/docs",
openapi_url="/openapi.json",
contact={
"name": "Tran Vinh Liem",
"email": "riottecboi@gmail.com",
"url": "https://about.riotteboi.com"
}
)
folder = os.getcwd()
app.mount("/static", StaticFiles(directory=folder+"/static", html=True), name="static")
app.include_router(_dnsLookup.router)
-
Imports:
-
FastAPI
is imported from thefastapi
module to create the FastAPI application instance. -
_dnsLookup
is imported from theroutes
module, which likely contains the API routes for handling DNS lookup requests. -
StaticFiles
is imported fromfastapi.staticfiles
to serve static files. -
os
is imported to interact with the operating system and get the current working directory.
-
-
FastAPI Application Instance:
- A new
FastAPI
instance is created and assigned to theapp
variable. - The application is configured with metadata such as the title, description, version, documentation URLs, and contact information.
- A new
-
Static Files:
- The
StaticFiles
class is used to mount a directory containing static files (e.g., CSS, JavaScript, images) at the/static
URL path. - The
folder
variable stores the current working directory, and the static files are located in thestatic
subdirectory. - The
html=True
parameter is set to allow serving HTML files from the static directory.
- The
-
Router Inclusion:
- The
include_router
method is called on theapp
instance to include the routes defined in the_dnsLookup
module. - This means that all the routes defined in
_dnsLookup.router
will be added to the FastAPI application.
- The
Final — Results
Conclusion
Overall, this application provides a web-based interface and API endpoints for users to look up WHOIS information and geographical location details for domains or IP addresses. The combination of FastAPI, Pydantic, encryption, geocoding, and mapping libraries enables a comprehensive and secure solution for DNS lookup functionality.
Top comments (0)