DEV Community

Riottecboi
Riottecboi

Posted on

DNS Lookup Web Application

Cover

DNS records are crucial for translating human-friendly domain names, like medium.com, into IP addresses that computers use to identify each other on the network. This translation is fundamental for accessing websites, sending emails, and other internet functionalities. WHOIS information provides details about the domain registration, including the registrant's contact information, domain status, and key dates.

Project Structure

DNS-Lookup/
├── app/
│   ├── core/
│   │   └── _config.py
│   │   └── settings.cfg
│   ├── routes/
│   │   └── _dnsLookup.py
│   ├── schemas/
│   │   ├── _dnsRequest.py
│   │   └── _whoisInfo.py
│   └── utils/
│       └── _dns.py
├── main.py
├── requirements.txt
├── static/
│   └── ... (static files)
└── templates/
    └── index.html
Enter fullscreen mode Exit fullscreen mode

Installation

Clone the repository:

git clone https://github.com/riottecboi/DNS-Lookup.git
Enter fullscreen mode Exit fullscreen mode

This project running on Python 3.11, so please be sure that you are install Python 3.11 on your machine.

sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update 
sudo apt install python3.11 -y
sudo apt-get install python3-pip -y
Enter fullscreen mode Exit fullscreen mode

Install virtual environment library if it’s not on your machine and initialize your virtual environment in your project and activate the virtual environment.

python3.11 -m pip install virtualenv
python3.11 -m venv <virtual-environment-name>
source <virtual-environment-name>/bin/activate
Enter fullscreen mode Exit fullscreen mode

Install all required libraries for this project from the file requirements.txt.

python3.11 -m pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Usage

  1. Start the FastAPI server:
uvicorn main:app --reload
Enter fullscreen mode Exit fullscreen mode
  1. Open your web browser and navigate to http://localhost:8000
  2. Enter a domain name in the provided input field.
  3. View the DNS records and WHOIS information displayed on the web page.

Dockerize

We can also run application as container as well. Please build an image from Dockerfile in the project.

docker build -f docker/Dockerfile -t dns-lookup .
Enter fullscreen mode Exit fullscreen mode

Then, running a container with the above image by

docker run -n DNS-Lookup -p 8000:8000 dns-lookup -d
Enter fullscreen mode Exit fullscreen mode

Docker Application

Explanation

This code defines two Pydantic models:

_dnsRequest.py

from pydantic import BaseModel, model_validator
import validators
import socket

class DomainOrIP(BaseModel):
    domain_or_ip: str
    @model_validator(mode='after')
    def validate_domain_or_ip(cls, value):
        try:
            # Check if input is IP address
            socket.inet_pton(socket.AF_INET, value.domain_or_ip)
            return value
        except socket.error:
            pass

        try:
            # Check if input is IPv6 address
            socket.inet_pton(socket.AF_INET6, value.domain_or_ip)
            return value
        except socket.error:
            pass

        try:
            # Check if input is domain name
            validators.domain(value.domain_or_ip)
            return value
        except socket.error:
            pass

        raise ValueError(f"Invalid Domain or IP.")


class ErrorResponse(BaseModel):
    error: str
    message: str
Enter fullscreen mode Exit fullscreen mode

DomainOrIP:

This model has a single field called domain_or_ip of type str.

It has a model_validator decorator that applies the validate_domain_or_ip function after the model has been initialized.
The validate_domain_or_ip function performs the following validations:

  • First, it tries to validate if the input domain_or_ip is a valid IPv4 address using the socket.inet_pton function with socket.AF_INET.
  • If the input is not a valid IPv4 address, it tries to validate if it's a valid IPv6 address using socket.inet_pton with socket.AF_INET6.
  • If the input is neither an IPv4 nor an IPv6 address, it tries to validate if it's a valid domain name using the validators.domain function from the validators library.
  • If the input fails all three validations, it raises a ValueError with the message "Invalid Domain or IP."
  • If the input passes any of the validations, the function returns the original value without any modifications.

ErrorResponse:

  • This model has two fields: error (str) and message (str).
  • It is likely used to represent an error response payload when an error occurs in the application.

The purpose of this code is to provide a way to validate user input and ensure that it is either a valid IP address (IPv4 or IPv6) or a valid domain name. This validation is important because the application likely needs to handle both IP addresses and domain names correctly.

The ErrorResponse model is used to create a structured error response that can be returned to the client when an error occurs, such as when the input is invalid.

_dns.py

import asyncio
import validators
import whois
import folium
from geopy import Nominatim
from schemas._whoisInfo import WhoisInfo

class DomainLocator:
    def __init__(self, domain: str):
        self.geolocator = Nominatim(user_agent="DNS_lookup")
        self.domain = domain
        self.domain_info = None
        self.return_info = None
        self.domain_map = None

    async def fetch_domain_info(self):
        try:
            loop = asyncio.get_event_loop()
            self.domain_info = await loop.run_in_executor(None, whois.whois, self.domain)
        except Exception as e:
            print(f"Error fetching WHOIS information for {self.domain}: {e}")

    async def get_coordinates(self, location):
        try:
            location = self.geolocator.geocode(location)
            if location:
                return location.latitude, location.longitude
            else:
                return None, None
        except Exception as e:
            print(f"Error fetching coordinates for {location}: {e}")
            return None, None

    async def plot_domain_location(self):
        if self.domain_info and self.domain_info.registrar:
            location = self.domain_info.address
            if self.domain_info.country and isinstance(self.domain_info.country, str):
                location = self.domain_info.country
            if self.domain_info.city and isinstance(self.domain_info.city, str):
                location = self.domain_info.city
            lat, lon = await self.get_coordinates(location)
            if lat and lon:
                map = folium.Map(location=[lat, lon], zoom_start=4)
                folium.Marker([lat, lon], popup=f"{self.domain}").add_to(map)
                self.domain_map = map.get_root()._repr_html_()
            else:
                print(f"Unable to find coordinates for location: {location}")
        else:
            print(f"No registrar information found for {self.domain}")
            self.domain_map = ''

    async def map_whois_data(self, data):
        if self.domain_info.domain_name and "Name or service not known" not in self.domain_info.text:
            whois_fields = list(WhoisInfo.model_fields.keys())
            self.return_info = {}
            for field in whois_fields:
                if field in data:
                    self.return_info[field] = data[field]
            return self.return_info
        else:
            return {}
    async def process_domain(self):
        if self.domain:
            print(f"Processing domain: {self.domain}")
            await self.fetch_domain_info()
            await self.map_whois_data(self.domain_info)
            await self.plot_domain_location()
            return self.return_info, self.domain_map
        else:
            print("No valid domain to process")
            return None, None
Enter fullscreen mode Exit fullscreen mode

This code defines a DomainLocator class that is responsible for fetching WHOIS information for a given domain, mapping the WHOIS data to a structured model, and plotting the location of the domain on a map using the folium library.

  • __init__(self, domain: str):

    • Initializes the DomainLocator instance with a domain name.
    • Creates a Nominatim instance from the geopy library for geocoding purposes.
    • Sets the domain_info, return_info, and domain_map attributes to None initially.
  • async fetch_domain_info(self):

    • Fetches the WHOIS information for the given domain using the whois library.
    • Runs the whois.whois function in an executor to avoid blocking the event loop.
    • Stores the WHOIS information in the domain_info attribute.
  • async get_coordinates(self, location):

    • Takes a location string as input and uses the Nominatim geocoder to retrieve the latitude and longitude coordinates.
    • Returns the coordinates as a tuple (latitude, longitude) or (None, None) if the location could not be geocoded.
  • async plot_domain_location(self):

    • Attempts to determine the location of the domain based on the available WHOIS information (address, country, or city).
    • Calls the get_coordinates method to obtain the latitude and longitude coordinates for the location.
    • If coordinates are found, creates a folium map centered on those coordinates and adds a marker for the domain.
    • Stores the HTML representation of the map in the domain_map attribute.
  • async map_whois_data(self, data):

    • Maps the WHOIS data to the WhoisInfo model defined in the schemas._whoisInfo module.
    • Iterates over the fields in the WhoisInfo model and populates the return_info dictionary with the corresponding values from the WHOIS data.
    • Returns the return_info dictionary if the WHOIS data is valid, or an empty dictionary if the data is not available or invalid.
  • async process_domain(self):

    • The main method that orchestrates the entire process of fetching WHOIS information, mapping the data, and plotting the domain location.
    • Calls the fetch_domain_info, map_whois_data, and plot_domain_location methods in sequence.
    • Returns the return_info dictionary and the domain_map HTML content.

This class is designed to be used asynchronously, as indicated by the async keyword on the methods. It leverages the asyncio library and the run_in_executor function to offload blocking operations (like fetching WHOIS data) to a separate executor, allowing the event loop to remain responsive.

The process_main method can be called with a valid domain name, and it will return the structured WHOIS information and a map showing the location of the domain (if available).

_dnsLookup.py

import json
import os
from fastapi import APIRouter, HTTPException, Response, Request, Form, status, Depends
from fastapi.responses import RedirectResponse
from utils._dns import DomainLocator
from schemas._dnsRequest import DomainOrIP, ErrorResponse
from pydantic_core._pydantic_core import ValidationError
from cryptography.fernet import Fernet
from fastapi.templating import Jinja2Templates

key = Fernet.generate_key()
cipher_suite = Fernet(key)

folder = os.getcwd()
os.chdir("..")
path = os.getcwd()
templates = Jinja2Templates(directory=path+"/templates")

router = APIRouter()

def serialize_datetime(dt_or_list):
    if isinstance(dt_or_list, list):
        return [dt.isoformat() for dt in dt_or_list]
    if dt_or_list is None:
        return ''
    else:
        return dt_or_list.isoformat()

async def process_and_encrypt_data(domain_or_ip: str):
    try:
        validated_input = DomainOrIP(domain_or_ip=domain_or_ip)
        domain_or_ip = validated_input.domain_or_ip
        locator = DomainLocator(domain_or_ip)
        domain_info, domain_map = await locator.process_domain()
        if domain_info:
            domain_info['updated_date'] = serialize_datetime(domain_info['updated_date'])
            domain_info['creation_date'] = serialize_datetime(domain_info['creation_date'])
            domain_info['expiration_date'] = serialize_datetime(domain_info['expiration_date'])

            encrypted_domain_info = cipher_suite.encrypt(json.dumps(domain_info).encode()).decode()
            encrypted_domain_map = cipher_suite.encrypt(domain_map.encode()).decode()

            return encrypted_domain_info, encrypted_domain_map
        else:
            return None, None
    except ValidationError as e:
        raise HTTPException(status_code=400, detail={"error": "Not processing Domain/IP",
                                                     "message": "The input cannot process. Please try again."})

@router.post("/lookup", responses={400: {"model": ErrorResponse}})
async def dns_lookup(domain_or_ip: str = Form(...)):
    try:
        encrypted_domain_info, encrypted_domain_map = await process_and_encrypt_data(domain_or_ip)
        return RedirectResponse(url=f"/result?domain={domain_or_ip}&domain_info={encrypted_domain_info}&domain_map={encrypted_domain_map}", status_code=302)

    except ValidationError as e:
        raise HTTPException(status_code=400, detail=ErrorResponse(error="Not processing Domain/IP",
                            message="The input cannot process. Please try again.").dict())

@router.get("/home")
async def get_template(request: Request):
    return templates.TemplateResponse("index.html", {"request": request, "domain_info": None, "domain_map": None, "found": True})

@router.get("/result")
async def get_template(request: Request):
    found = True
    search_domain = request.query_params.get('domain')
    domain_info = request.query_params.get('domain_info')
    domain_map = request.query_params.get('domain_map')

    if domain_info == 'None':
        domain_info = eval(domain_info)
        domain_map = eval(domain_map)
        found = False

    else:
        decrypted_domain_info_json = cipher_suite.decrypt(domain_info.encode()).decode() if domain_info else None
        domain_info = json.loads(decrypted_domain_info_json)

        domain_map = cipher_suite.decrypt(domain_map.encode()).decode() if domain_map else None

    return templates.TemplateResponse("index.html", {"request": request, "domain": search_domain, "domain_info": domain_info, "domain_map": domain_map, "found": found})
Enter fullscreen mode Exit fullscreen mode
  • Imports: The code imports the necessary modules and classes, such as json for working with JSON data, os for interacting with the operating system, and various classes from the FastAPI framework.

  • Initialization: The code generates a key using the Fernet class from the cryptography module, which is likely used for encryption and decryption purposes. It also sets up a Jinja2 template environment for rendering HTML templates and creates an instance of the APIRouter class from FastAPI.

  • Helper Functions:

    • serialize_datetime: This function converts datetime objects or a list of datetime objects to ISO format strings.
    • process_and_encrypt_data: This async function takes a domain or IP address as input, validates it using the DomainOrIP schema, retrieves domain information using the DomainLocator class, and encrypts the domain information and domain map using the Fernet cipher suite.
  • API Routes:

    • @router.post("/lookup"): This route accepts a domain or IP address as form data, calls the process_and_encrypt_data function, and redirects the user to the /result route with the encrypted domain information and domain map as query parameters.
    • @router.get("/home"): This route renders the index.html template without any domain information.
    • @router.get("/result"): This route retrieves the domain, encrypted domain information, and encrypted domain map from the query parameters. If the domain information is None, it sets a found flag to False. Otherwise, it decrypts the domain information and domain map using the Fernet cipher suite and renders the index.html template with the retrieved data.

The code is a part of a larger application that retrieves and processes DNS information for a given domain or IP address.

It uses encryption and decryption techniques to securely transmit and store the domain information and map. The application provides a web interface ( through the index.html template) where users can input a domain or IP address, and the application retrieves and displays the corresponding DNS information.

main.py

from fastapi import FastAPI
from routes import _dnsLookup
from fastapi.staticfiles import StaticFiles
import os

app = FastAPI(
    title="DNS Lookup API",
    description="API for getting whois information and location of domain or IP",
    version="1.0",
    docs_url="/docs",
    openapi_url="/openapi.json",
    contact={
        "name": "Tran Vinh Liem",
        "email": "riottecboi@gmail.com",
        "url": "https://about.riotteboi.com"
    }
)
folder = os.getcwd()

app.mount("/static", StaticFiles(directory=folder+"/static", html=True), name="static")
app.include_router(_dnsLookup.router)
Enter fullscreen mode Exit fullscreen mode
  • Imports:

    • FastAPI is imported from the fastapi module to create the FastAPI application instance.
    • _dnsLookup is imported from the routes module, which likely contains the API routes for handling DNS lookup requests.
    • StaticFiles is imported from fastapi.staticfiles to serve static files.
    • os is imported to interact with the operating system and get the current working directory.
  • FastAPI Application Instance:

    • A new FastAPI instance is created and assigned to the app variable.
    • The application is configured with metadata such as the title, description, version, documentation URLs, and contact information.
  • Static Files:

    • The StaticFiles class is used to mount a directory containing static files (e.g., CSS, JavaScript, images) at the /static URL path.
    • The folder variable stores the current working directory, and the static files are located in the static subdirectory.
    • The html=True parameter is set to allow serving HTML files from the static directory.
  • Router Inclusion:

    • The include_router method is called on the app instance to include the routes defined in the _dnsLookup module.
    • This means that all the routes defined in _dnsLookup.router will be added to the FastAPI application.

Final — Results

Facebook WHOIS result

Google WHOIS result

No result for Example.domain

Conclusion

Overall, this application provides a web-based interface and API endpoints for users to look up WHOIS information and geographical location details for domains or IP addresses. The combination of FastAPI, Pydantic, encryption, geocoding, and mapping libraries enables a comprehensive and secure solution for DNS lookup functionality.

Top comments (0)