DEV Community

Sahil kashyap
Sahil kashyap

Posted on

Sync ES data with your mysql db

Code
Problem: How to sync more than 10,000 records from Elasticsearch to mysql db

Solution:
1) Use the Scroll API to get all the record

2) keep appending them to csv file
3) Import the csv file in in mysql

Let's use the scroll api.

But before that we need a method to extract the data from ES response object for that,I used array_pluck.

  function array_pluck($array, $key) {
    return array_map(function($v) use ($key) {
      return is_object($v) ? $v->$key : $v[$key];
    }, $array);
}
Enter fullscreen mode Exit fullscreen mode

And to convert the json to csv
I used this function jsonToCsv. Credits jakebathman

Below get 100 chunks and then appends it to csv file , and when all scrolling is done csv file is closed

private function fetch_the_es_data_with_scroll_dump_csv($index_name){
        $params = array(
            "scroll" => "5m",
            "size" => 100,
            'index' => $index_name,
            "_source_excludes"=>["@version","@timestamp"]
        );
        $client=$this->elastic_obj;
        $docs = $client->search($params);
        $scroll_id = $docs['_scroll_id'];
        // echo "<pre>";
        // print_r($docs);
        // echo "</pre>";

        $tablename= $params['index'];
        $dockeys=array_keys($docs['hits']['hits'][0]['_source']);

        $keys = array_flip($dockeys);
            $fieldsArray = array_fill_keys(array_keys($keys),"");
        // echo $row;
        $source_data = array_pluck( $docs['hits']['hits'], '_source' );
        $this->csvFileObject="./ForSql_importCSV/".$tablename.".csv";
        // $thecsv="./ForSql_importCSV/".$tablename.".csv";
        jsonToCsv($source_data,$this->csvFileObject);
        // echo $tablename;
        // exit;
        $fp = fopen($this->csvFileObject, 'a');
        while (\true) {
            $response = $client->scroll(
                array(
                    "scroll_id" => $scroll_id,
                    "scroll" => "5m"
                )
            );

            if (count($response['hits']['hits']) > 0) {
                // echo "Do Work Here";
                // echo "<pre>";
                // print_r($response['hits']['hits']);
                // echo "</pre>";

                // echo "<br>";
                // Get new scroll_id
                // $source_datainscroll = array_pluck( $response['hits']['hits'], '_source' );
                // echo "<pre>";
                // print_r($source_datainscroll);
                // echo "</pre>";
                // $row = array_replace($fieldsArray,$source_datainscroll);
                // echo "<pre>row";
                // print_r($row);
                // echo "</pre>";
                $fp = fopen($this->csvFileObject, 'a');
                foreach ($response['hits']['hits'] as $fields) {

                    fputcsv($fp,$fields['_source']);
                }


                $scroll_id = $response['_scroll_id'];
            } else {

                fclose($fp);
                // All done scrolling over data
                echo $tablename;
                echo "<br>All done scrolling over data<br>";
                break;
            }
        }

    }
Enter fullscreen mode Exit fullscreen mode

Let's import the csv files in mysql

make sure the local infile import is enabled, use below queries

 show global variables like 'local_infile';
 SET GLOBAL local_infile=1;
Enter fullscreen mode Exit fullscreen mode

store data in DB

array_pluck:to extract the hits object
jsonToCsv:to convert json data in csv

Sql query will look something like:

LOAD DATA LOCAL INFILE 'http://0.0.0.0:8082//ForSql_importCSV/custom_carrier_post.csv' INTO TABLE custom_carrier_post FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' LINES TERMINATED BY ' ' IGNORE 1 ROWS
Enter fullscreen mode Exit fullscreen mode
private function check_db_for_index($data)
    {
        if (empty($data['hits']['hits'])) {
            echo "no index found";
            return;
        }
        $tablename= $data['hits']['hits'][0]['_index'];

        $source_data = array_pluck( $data['hits']['hits'], '_source' );
        jsonToCsv($source_data,"./ForSql_importCSV/".$tablename.".csv");
        echo $tablename;
        echo "<br>";
        $folder= $this->home_url('/ForSql_importCSV/');
        // echo $folder;
        // exit;
        $sqlquery="LOAD DATA LOCAL INFILE '".$folder.$tablename.".csv'
                                    INTO TABLE ".$tablename."
                                    FIELDS TERMINATED BY ','
                                    OPTIONALLY ENCLOSED BY '\"'
                                    LINES TERMINATED BY '\n'
                                    IGNORE 1 ROWS";
                                    echo "<br>";
                                    echo $sqlquery;

        $this->pdo_conn->exec($sqlquery);



    }
Enter fullscreen mode Exit fullscreen mode

Latest comments (0)